worker

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package worker provides job processing runtime for Spooled queues.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event struct {
	Type      EventType
	Timestamp time.Time
	Data      any
}

Event is emitted by the worker during processing.

type EventHandler

type EventHandler func(event Event)

EventHandler is a callback for worker events.

type EventType

type EventType string

Event types for worker events.

const (
	EventWorkerStarted   EventType = "worker:started"
	EventWorkerStopped   EventType = "worker:stopped"
	EventWorkerError     EventType = "worker:error"
	EventJobClaimed      EventType = "job:claimed"
	EventJobStarted      EventType = "job:started"
	EventJobCompleted    EventType = "job:completed"
	EventJobFailed       EventType = "job:failed"
	EventJobProgress     EventType = "job:progress"
	EventJobHeartbeat    EventType = "job:heartbeat"
	EventWorkerHeartbeat EventType = "worker:heartbeat"
)

type JobClaimedData

type JobClaimedData struct {
	JobID     string
	QueueName string
}

JobClaimedData is emitted when a job is claimed.

type JobCompletedData

type JobCompletedData struct {
	JobID     string
	QueueName string
	Result    map[string]any
	Duration  time.Duration
}

JobCompletedData is emitted when a job completes.

type JobContext

type JobContext struct {
	// Context is the Go context with cancellation
	Context context.Context
	// JobID is the unique job identifier
	JobID string
	// QueueName is the queue this job belongs to
	QueueName string
	// Payload is the job payload data
	Payload map[string]any
	// RetryCount is the current retry attempt number
	RetryCount int
	// MaxRetries is the maximum number of retries
	MaxRetries int
	// Progress reports job progress (0-100)
	Progress func(percent float64, message string) error
	// Log logs a message at the specified level
	Log func(level string, message string, meta map[string]any)
	// contains filtered or unexported fields
}

JobContext provides context and utilities for job handlers.

type JobFailedData

type JobFailedData struct {
	JobID     string
	QueueName string
	Error     error
	Duration  time.Duration
	WillRetry bool
}

JobFailedData is emitted when a job fails.

type JobHandler

type JobHandler func(ctx *JobContext) (map[string]any, error)

JobHandler is a function that processes a job. Return an error to fail the job, or nil/result to complete it.

type JobProgressData

type JobProgressData struct {
	JobID   string
	Percent float64
	Message string
}

JobProgressData is emitted when job progress is updated.

type JobStartedData

type JobStartedData struct {
	JobID     string
	QueueName string
}

JobStartedData is emitted when job processing starts.

type Options

type Options struct {
	// QueueName is the name of the queue to process
	QueueName string
	// Hostname is the worker hostname (default: auto-detected)
	Hostname string
	// WorkerType is an identifier for this worker type
	WorkerType string
	// Concurrency is the maximum concurrent jobs (1-100, default: 5)
	Concurrency int
	// PollInterval is the polling interval (default: 1s)
	PollInterval time.Duration
	// LeaseDuration is the job lease duration in seconds (5-3600, default: 30)
	LeaseDuration int
	// HeartbeatFraction is the heartbeat interval as a fraction of lease duration (default: 0.5)
	HeartbeatFraction float64
	// ShutdownTimeout is the graceful shutdown timeout (default: 30s)
	ShutdownTimeout time.Duration
	// Version is the worker version string
	Version string
	// Metadata is additional worker metadata
	Metadata map[string]string
	// Debug enables debug logging
	Debug bool
	// Logger is a custom logger function
	Logger func(msg string, args ...any)
}

Options configures a worker.

func DefaultOptions

func DefaultOptions() Options

DefaultOptions returns options with sensible defaults.

type State

type State string

State represents the worker state.

const (
	StateIdle     State = "idle"
	StateStarting State = "starting"
	StateRunning  State = "running"
	StateStopping State = "stopping"
	StateStopped  State = "stopped"
	StateError    State = "error"
)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes jobs from a Spooled queue using REST polling.

func NewWorker

func NewWorker(jobs *resources.JobsResource, workers *resources.WorkersResource, opts Options) *Worker

NewWorker creates a new REST polling worker.

func (*Worker) ActiveJobCount

func (w *Worker) ActiveJobCount() int

ActiveJobCount returns the number of jobs currently being processed.

func (*Worker) OnEvent

func (w *Worker) OnEvent(handler EventHandler)

OnEvent registers an event handler.

func (*Worker) Process

func (w *Worker) Process(handler JobHandler)

Process registers a job handler.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start starts the worker.

func (*Worker) State

func (w *Worker) State() State

State returns the current worker state.

func (*Worker) Stop

func (w *Worker) Stop() error

Stop gracefully stops the worker.

func (*Worker) WorkerID

func (w *Worker) WorkerID() string

WorkerID returns the registered worker ID.

type WorkerErrorData

type WorkerErrorData struct {
	Error error
}

WorkerErrorData is emitted on worker errors.

type WorkerStartedData

type WorkerStartedData struct {
	WorkerID  string
	QueueName string
}

WorkerStartedData is emitted when the worker starts.

type WorkerStoppedData

type WorkerStoppedData struct {
	WorkerID string
	Reason   string
}

WorkerStoppedData is emitted when the worker stops.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL