Documentation
¶
Overview ¶
Package jobs defines interfaces for asynchronous job processing in RoadRunner. It provides the Driver interface for queue backends (e.g. AMQP, Kafka, Beanstalk), the Pipeline interface for configuring named pipelines, and the Message and Job interfaces for producing and consuming queue items. The State struct exposes runtime counters such as active, delayed, and reserved job counts.
Index ¶
Constants ¶
const ( RRID string = "rr_id" RRJob string = "rr_job" RRHeaders string = "rr_headers" RRPipeline string = "rr_pipeline" RRDelay string = "rr_delay" RRPriority string = "rr_priority" RRAutoAck string = "rr_auto_ack" )
constant keys to pack/unpack messages from different drivers
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Constructor ¶
type Constructor interface {
// Name returns the name of the driver
Name() string
// DriverFromConfig constructs a driver (e.g. kafka, amqp) from the configuration using the provided configKey
DriverFromConfig(ctx context.Context, configKey string, queue Queue, pipeline Pipeline) (Driver, error)
// DriverFromPipeline constructs a driver (e.g. kafka, amqp) from the pipeline. All configuration is provided by the pipeline
DriverFromPipeline(ctx context.Context, pipe Pipeline, queue Queue) (Driver, error)
}
Constructor constructs Consumer interface. Endure abstraction.
type Driver ¶
type Driver interface {
// Push pushes the job to the underlying driver
Push(ctx context.Context, msg Message) error
// Run starts consuming the pipeline
Run(ctx context.Context, pipeline Pipeline) error
// Stop stops the consumer and closes the underlying connection
Stop(ctx context.Context) error
// Pause pauses the jobs consuming (while still allowing job pushing)
Pause(ctx context.Context, pipeline string) error
// Resume resumes the consumer
Resume(ctx context.Context, pipeline string) error
// State returns information about the driver state
State(ctx context.Context) (*State, error)
}
Driver represents the interface for a single jobs driver
type Item ¶
type Item interface {
// ID returns a unique identifier for the item
ID() string
// GroupID returns the group associated with the item, used to remove all items with the same groupID
GroupID() string
// Priority returns the priority level used to sort the item
Priority() int64
}
Item interface represents the base meta-information which any priority queue message must have
type Job ¶
type Job interface {
Item
// Ack acknowledges the item after processing
Ack() error
// Nack discards the item
Nack() error
// NackWithOptions discards the item with an optional requeue flag
NackWithOptions(requeue bool, delay int) error
// Requeue puts the message back to the queue with an optional delay
Requeue(headers map[string][]string, delay int) error
// Body returns the payload associated with the item
Body() []byte
// Context returns any meta-information associated with the item
Context() ([]byte, error)
// Headers return the metadata for the item
Headers() map[string][]string
}
Job represents a binary heap item
type KafkaOptions ¶
type KafkaOptions interface {
// Offset returns the offset associated with the Job
Offset() int64
// Partition returns the partition associated with the Job
Partition() int32
// Topic returns the topic associated with the Job
Topic() string
// Metadata returns the metadata associated with the Job
Metadata() string
}
KafkaOptions - options (leave them empty for other drivers)
type Message ¶
type Message interface {
Item
KafkaOptions
// Name returns the name of the Job
Name() string
// Payload returns the data associated with the job
Payload() []byte
// Delay returns the delay time for the Job (not supported by all drivers)
Delay() int64
// AutoAck returns the autocommit status for the Job
AutoAck() bool
// UpdatePriority sets the priority of the Job. Priority is optional but cannot be set to 0.
// The default priority is 10
UpdatePriority(int64)
// Headers return the metadata for the item
Headers() map[string][]string
}
Message represents the protobuf message received from the RPC call
type Pipeline ¶
type Pipeline interface {
// With sets a pipeline value
With(name string, value any)
// Name returns the pipeline name.
Name() string
// Driver returns the driver associated with the pipeline.
Driver() string
// Has checks if a value is present in the pipeline.
Has(name string) bool
// String returns the value of an option as a string or the default value.
String(name string, d string) string
// Int returns the value of an option as an int or the default value.
Int(name string, d int) int
// Bool returns the value of an option as a bool or the default value.
Bool(name string, d bool) bool
// Map returns the nested map value or an empty config.
// This might be used for SQS attributes or tags, for example
Map(name string, out map[string]string) error
// Priority returns the default pipeline priority
Priority() int64
// Get is used to retrieve the data associated with a key
Get(key string) any
}
type Queue ¶
type Queue interface {
// Remove removes an element with provided ID (if exists) and returns that element
Remove(id string) []Job
// Insert adds an item to the queue
Insert(item Job)
// ExtractMin returns the item with the highest priority (less value is the highest priority)
ExtractMin() Job
// Len returns the number of items in the queue
Len() uint64
}
Queue represents JOBS plugin queue with it's elements types inside
type State ¶
type State struct {
// Pipeline name
Pipeline string
// Driver name
Driver string
// Queue name (tube for the beanstalk)
Queue string
// Active jobs which are consumed by the driver but not handled by the PHP worker yet
Active int64
// Delayed jobs
Delayed int64
// Reserved jobs which are in the driver but not consumed yet
Reserved int64
// Status - 1 Ready, 0 - Paused
Ready bool
// New in 2.10.5, pipeline priority
Priority uint64
// ErrorMessage New in 2023.1
ErrorMessage string
}
State represents job's state