Documentation
¶
Index ¶
- Constants
- type Config
- func NewInMemoryConfig() Config
- func NewMySQLConfig(connectionString string) Config
- func NewPostgresConfig(connectionString string) Config
- func NewRedisConfig(address, username, password string, db int) Config
- func NewSQSConfig(queueURL, region, accessKeyID, secretAccessKey string) Config
- func NewSQSFifoConfig(queueURL, region, accessKeyID, secretAccessKey, messageGroupID string) Config
- func (c Config) Validate(logger logger.Logger) error
- func (c Config) WithConcurrencyLimit(limit int) Config
- func (c Config) WithDLQAdapter(adapter dlq.DLQAdapter) Config
- func (c Config) WithExponentialBackoff(enabled bool) Config
- func (c Config) WithJobTimeout(timeout time.Duration) Config
- func (c Config) WithMaxRetryAttempts(attempts int) Config
- func (c Config) WithMaxWorkers(maxWorkers int) Config
- func (c Config) WithMetricsCallback(callback MetricsCallback) Config
- func (c Config) WithMiddleware(m middleware.Middleware) Config
- func (c Config) WithMiddlewares(middlewares ...middleware.Middleware) Config
- func (c Config) WithRetryDelay(delay time.Duration) Config
- func (c Config) WithStats(enabled bool) Config
- type DatabaseConfig
- type DriverConfig
- type JobMetrics
- type MetricsCallback
- type RedisConfig
- type SQSConfig
Constants ¶
const ( // DriverMemory uses in-memory storage, suitable for development and testing DriverMemory = "memory" // DriverRedis uses Redis as the backend storage DriverRedis = "redis" // DriverSQS uses AWS SQS as the backend storage DriverSQS = "sqs" // DriverDatabase is reserved for future database backend support DriverDatabase = "database" )
Driver type constants for supported queue backends
const ( DatabaseTypePostgres = "postgres" DatabaseTypeMySQL = "mysql" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Driver specifies the queue backend ("memory" or "redis")
Driver string
// DriverConfig contains driver-specific configuration
DriverConfig DriverConfig
// MaxWorkers is the maximum number of concurrent worker goroutines
MaxWorkers int
// ConcurrencyLimit is the maximum number of jobs that can be processed simultaneously
ConcurrencyLimit int
// OnJobComplete is called when a job finishes processing
OnJobComplete MetricsCallback
// StatsEnabled enables collection of queue statistics
StatsEnabled bool
// MaxRetryAttempts is the number of times to retry a failed job
MaxRetryAttempts int
// RetryDelay is the base delay between retry attempts
RetryDelay time.Duration
// ExponentialBackoff enables exponential increase of retry delays
ExponentialBackoff bool
// DLQAdapter handles failed jobs after max retries
DLQAdapter dlq.DLQAdapter
// Middlewares is the chain of job processing middleware
Middlewares []middleware.Middleware
// JobTimeout is the default timeout for job execution (can be overridden per job)
JobTimeout time.Duration
}
Config holds all configuration options for a queue instance. Use the With* methods to set specific options in a fluent manner.
func NewInMemoryConfig ¶
func NewInMemoryConfig() Config
NewInMemoryConfig creates a new Config instance with in-memory driver and sensible defaults. This is suitable for development and testing environments.
The in-memory driver stores all jobs in memory and does not provide persistence across application restarts. Jobs will be lost if the application terminates.
Returns:
- A Config instance configured with the in-memory driver
func NewMySQLConfig ¶
NewMySQLConfig creates a new Config instance with MySQL driver and sensible defaults. This is suitable for production environments using MySQL as the queue backend.
The MySQL driver provides durable storage and can be used in distributed environments where multiple application instances need to share the same job queue.
Parameters:
- connectionString: MySQL connection string (e.g., "user:password@tcp(127.0.0.1:3306)/dbname?parseTime=true")
Returns:
- A Config instance configured with the MySQL database driver
func NewPostgresConfig ¶
func NewRedisConfig ¶
NewRedisConfig creates a new Config instance with Redis driver and sensible defaults. This is suitable for production environments.
The Redis driver provides persistence and can be used in distributed environments where multiple application instances need to share the same job queue.
Parameters:
- address: Redis server address (e.g., "localhost:6379")
- username: Redis server username, or empty string if no username
- password: Redis server password, or empty string if no password
- db: Redis database number to use
Returns:
- A Config instance configured with the Redis driver
func NewSQSConfig ¶
NewSQSConfig creates a new Config instance with AWS SQS driver and sensible defaults. This is suitable for production environments with AWS SQS standard queues.
The SQS driver provides fully managed message queuing with high availability and durability. It's appropriate for cloud-based applications that need reliable message processing.
Parameters:
- queueURL: The URL of the SQS queue (from AWS console or API)
- region: AWS region where the queue is located (e.g., "us-west-2")
- accessKeyID: AWS access key ID for authentication, or empty to use environment/instance profile
- secretAccessKey: AWS secret access key for authentication, or empty to use environment/instance profile
Returns:
- A Config instance configured with the SQS driver for standard queues
func NewSQSFifoConfig ¶
NewSQSFifoConfig creates a new Config instance with AWS SQS FIFO queue driver. This is suitable for production environments requiring exactly-once processing and message ordering guarantees.
FIFO (First-In-First-Out) queues provide additional guarantees compared to standard SQS queues: - Messages are processed in the exact order they are sent - Messages are delivered exactly once with no duplicates
Parameters:
- queueURL: The URL of the SQS FIFO queue (must end with .fifo)
- region: AWS region where the queue is located (e.g., "us-west-2")
- accessKeyID: AWS access key ID for authentication, or empty to use environment/instance profile
- secretAccessKey: AWS secret access key for authentication, or empty to use environment/instance profile
- messageGroupID: FIFO queue message group ID (required), defines which messages are processed in order
Returns:
- A Config instance configured with the SQS driver for FIFO queues
func (Config) Validate ¶
Validate checks if the configuration is valid. It returns an error if any required fields are missing or invalid.
This method verifies that the configuration has valid worker counts, concurrency limits, and a supported driver type. It logs any validation errors found.
Parameters:
- logger: Logger to record validation errors
Returns:
- nil if the configuration is valid
- an error describing the validation failure
func (Config) WithConcurrencyLimit ¶
WithConcurrencyLimit sets the maximum number of jobs that can be processed simultaneously.
This controls the total number of jobs that can be in-process at once, which may be different from the number of workers. This is useful for rate limiting and preventing system overload.
Parameters:
- limit: Maximum number of concurrent jobs
Returns:
- Updated Config with the concurrency limit setting
func (Config) WithDLQAdapter ¶
func (c Config) WithDLQAdapter(adapter dlq.DLQAdapter) Config
WithDLQAdapter sets the Dead Letter Queue adapter for handling failed jobs.
A Dead Letter Queue (DLQ) is used to store jobs that have failed after exceeding their retry attempts. This allows for later analysis, debugging, or manual reprocessing.
Parameters:
- adapter: An implementation of the dlq.DLQAdapter interface
Returns:
- Updated Config with the DLQ adapter set
func (Config) WithExponentialBackoff ¶
WithExponentialBackoff enables or disables exponential increase of retry delays.
When enabled, the delay between retry attempts will increase exponentially based on the retry count. This helps to prevent overwhelming the system with retry attempts when there are persistent issues.
Parameters:
- enabled: Whether to use exponential backoff for retries
Returns:
- Updated Config with the exponential backoff setting
func (Config) WithJobTimeout ¶
WithJobTimeout sets the default timeout for job execution.
This sets the maximum duration that a job can run before it's considered timed out. Individual jobs can override this timeout by setting their own timeout in the job context.
Parameters:
- timeout: The maximum duration for job execution
Returns:
- Updated Config with the job timeout set
func (Config) WithMaxRetryAttempts ¶
WithMaxRetryAttempts sets the number of times to retry a failed job.
When a job fails with an error, it will be retried up to this many times before being considered permanently failed and potentially sent to the DLQ. Set to 0 to disable retries.
Parameters:
- attempts: Maximum number of retry attempts
Returns:
- Updated Config with the max retry attempts setting
func (Config) WithMaxWorkers ¶
WithMaxWorkers sets the maximum number of concurrent worker goroutines.
This controls how many worker goroutines will be spawned to process jobs. Each worker can process one job at a time. The optimal number depends on your workload and available resources.
Parameters:
- maxWorkers: Maximum number of worker goroutines to spawn
Returns:
- Updated Config with the max workers setting
func (Config) WithMetricsCallback ¶
func (c Config) WithMetricsCallback(callback MetricsCallback) Config
WithMetricsCallback sets the callback function for job completion metrics.
This function will be called after every job completes, with metrics including job ID, duration, error status, and timestamps. This is useful for monitoring, alerting, and performance tracking.
Parameters:
- callback: Function to call with job metrics on completion
Returns:
- Updated Config with the metrics callback set
func (Config) WithMiddleware ¶
func (c Config) WithMiddleware(m middleware.Middleware) Config
WithMiddleware adds a middleware to the processing chain. Middlewares are executed in the order they are added.
Middleware allows you to add cross-cutting functionality to job processing, such as logging, metrics collection, validation, or rate limiting.
Parameters:
- m: The middleware function to add
Returns:
- Updated Config with the middleware added
func (Config) WithMiddlewares ¶
func (c Config) WithMiddlewares(middlewares ...middleware.Middleware) Config
WithMiddlewares adds multiple middlewares to the processing chain. Middlewares are executed in the order they are added.
This is a convenience method for adding multiple middlewares at once.
Parameters:
- middlewares: One or more middleware functions to add
Returns:
- Updated Config with all the middlewares added
func (Config) WithRetryDelay ¶
WithRetryDelay sets the base delay between retry attempts.
This is the initial delay between retry attempts. If exponential backoff is enabled, this delay will increase with each retry attempt.
Parameters:
- delay: Base time to wait between retry attempts
Returns:
- Updated Config with the retry delay setting
func (Config) WithStats ¶
WithStats enables or disables queue statistics collection.
When enabled, the queue will collect metrics about job processing rates, queue sizes, and health indicators. This is useful for monitoring and debugging, but has a small performance cost.
Parameters:
- enabled: Whether to enable statistics collection
Returns:
- Updated Config with statistics collection setting
type DatabaseConfig ¶
type DatabaseConfig struct {
ConnectionString string
DatabaseType string // "postgres", "mysql".
// Migration options
AutoMigrate bool
// Connection pool settings
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
}
func (DatabaseConfig) Type ¶
func (c DatabaseConfig) Type() string
type DriverConfig ¶
type DriverConfig interface {
// Type returns the driver type identifier (e.g., "redis", "memory")
Type() string
}
DriverConfig defines the interface that all driver-specific configurations must implement.
type JobMetrics ¶
type JobMetrics struct {
// QueueName is the name of the queue the job was processed in
QueueName string
// JobID is the unique identifier of the processed job
JobID string
// Duration is how long the job took to process
Duration time.Duration
// Error contains any error that occurred during processing
Error error
// Timestamp is when the metrics were collected
Timestamp time.Time
}
JobMetrics contains metrics data for a completed job. This is passed to the MetricsCallback when a job completes processing.
type MetricsCallback ¶
type MetricsCallback func(metrics JobMetrics)
MetricsCallback is a function type for handling job completion metrics. It is called after a job completes processing, whether successful or not.
type RedisConfig ¶
type RedisConfig struct {
// Addr is the Redis server address (e.g., "localhost:6379")
Addr string
// Username is the Redis server username (optional)
Username string
// Password is the Redis server password (optional)
Password string
// Db is the Redis database number to use
Db int
}
RedisConfig contains configuration options specific to the Redis driver.
func (RedisConfig) Type ¶
func (r RedisConfig) Type() string
Type implements the DriverConfig interface.
type SQSConfig ¶
type SQSConfig struct {
// QueueURL is the URL of the SQS queue
QueueURL string
// Region is the AWS region where the SQS queue is located
Region string
// AccessKeyID is the AWS access key ID for authentication
AccessKeyID string
// SecretAccessKey is the AWS secret access key for authentication
SecretAccessKey string
// MaxMessages is the maximum number of messages to retrieve in a single batch (1-10)
MaxMessages int
// VisibilityTimeout is the duration that messages are hidden from subsequent retrieve requests
VisibilityTimeout time.Duration
// IsFifo indicates whether the SQS queue is a FIFO queue
IsFifo bool
// MessageGroupID is the FIFO queue message group ID (required for FIFO queues)
// If not specified, a default value of "default" will be used for FIFO queues
MessageGroupID string
// MessageDeduplicationID is used for FIFO queues to prevent duplicate messages (optional)
// If not specified, a unique ID will be generated automatically for each message
MessageDeduplicationID string
}
SQSConfig contains configuration options specific to the AWS SQS driver.