config

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DriverMemory uses in-memory storage, suitable for development and testing
	DriverMemory = "memory"
	// DriverRedis uses Redis as the backend storage
	DriverRedis = "redis"
	// DriverSQS uses AWS SQS as the backend storage
	DriverSQS = "sqs"
	// DriverDatabase is reserved for future database backend support
	DriverDatabase = "database"
)

Driver type constants for supported queue backends

View Source
const (
	DatabaseTypePostgres = "postgres"
	DatabaseTypeMySQL    = "mysql"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Driver specifies the queue backend ("memory" or "redis")
	Driver string
	// DriverConfig contains driver-specific configuration
	DriverConfig DriverConfig
	// MaxWorkers is the maximum number of concurrent worker goroutines
	MaxWorkers int
	// ConcurrencyLimit is the maximum number of jobs that can be processed simultaneously
	ConcurrencyLimit int
	// OnJobComplete is called when a job finishes processing
	OnJobComplete MetricsCallback
	// StatsEnabled enables collection of queue statistics
	StatsEnabled bool
	// MaxRetryAttempts is the number of times to retry a failed job
	MaxRetryAttempts int
	// RetryDelay is the base delay between retry attempts
	RetryDelay time.Duration
	// ExponentialBackoff enables exponential increase of retry delays
	ExponentialBackoff bool
	// DLQAdapter handles failed jobs after max retries
	DLQAdapter dlq.DLQAdapter
	// Middlewares is the chain of job processing middleware
	Middlewares []middleware.Middleware
	// JobTimeout is the default timeout for job execution (can be overridden per job)
	JobTimeout time.Duration
}

Config holds all configuration options for a queue instance. Use the With* methods to set specific options in a fluent manner.

func NewInMemoryConfig

func NewInMemoryConfig() Config

NewInMemoryConfig creates a new Config instance with in-memory driver and sensible defaults. This is suitable for development and testing environments.

The in-memory driver stores all jobs in memory and does not provide persistence across application restarts. Jobs will be lost if the application terminates.

Returns:

  • A Config instance configured with the in-memory driver

func NewMySQLConfig

func NewMySQLConfig(connectionString string) Config

NewMySQLConfig creates a new Config instance with MySQL driver and sensible defaults. This is suitable for production environments using MySQL as the queue backend.

The MySQL driver provides durable storage and can be used in distributed environments where multiple application instances need to share the same job queue.

Parameters:

  • connectionString: MySQL connection string (e.g., "user:password@tcp(127.0.0.1:3306)/dbname?parseTime=true")

Returns:

  • A Config instance configured with the MySQL database driver

func NewPostgresConfig

func NewPostgresConfig(connectionString string) Config

func NewRedisConfig

func NewRedisConfig(address, username, password string, db int) Config

NewRedisConfig creates a new Config instance with Redis driver and sensible defaults. This is suitable for production environments.

The Redis driver provides persistence and can be used in distributed environments where multiple application instances need to share the same job queue.

Parameters:

  • address: Redis server address (e.g., "localhost:6379")
  • username: Redis server username, or empty string if no username
  • password: Redis server password, or empty string if no password
  • db: Redis database number to use

Returns:

  • A Config instance configured with the Redis driver

func NewSQSConfig

func NewSQSConfig(queueURL, region, accessKeyID, secretAccessKey string) Config

NewSQSConfig creates a new Config instance with AWS SQS driver and sensible defaults. This is suitable for production environments with AWS SQS standard queues.

The SQS driver provides fully managed message queuing with high availability and durability. It's appropriate for cloud-based applications that need reliable message processing.

Parameters:

  • queueURL: The URL of the SQS queue (from AWS console or API)
  • region: AWS region where the queue is located (e.g., "us-west-2")
  • accessKeyID: AWS access key ID for authentication, or empty to use environment/instance profile
  • secretAccessKey: AWS secret access key for authentication, or empty to use environment/instance profile

Returns:

  • A Config instance configured with the SQS driver for standard queues

func NewSQSFifoConfig

func NewSQSFifoConfig(queueURL, region, accessKeyID, secretAccessKey, messageGroupID string) Config

NewSQSFifoConfig creates a new Config instance with AWS SQS FIFO queue driver. This is suitable for production environments requiring exactly-once processing and message ordering guarantees.

FIFO (First-In-First-Out) queues provide additional guarantees compared to standard SQS queues: - Messages are processed in the exact order they are sent - Messages are delivered exactly once with no duplicates

Parameters:

  • queueURL: The URL of the SQS FIFO queue (must end with .fifo)
  • region: AWS region where the queue is located (e.g., "us-west-2")
  • accessKeyID: AWS access key ID for authentication, or empty to use environment/instance profile
  • secretAccessKey: AWS secret access key for authentication, or empty to use environment/instance profile
  • messageGroupID: FIFO queue message group ID (required), defines which messages are processed in order

Returns:

  • A Config instance configured with the SQS driver for FIFO queues

func (Config) Validate

func (c Config) Validate(logger logger.Logger) error

Validate checks if the configuration is valid. It returns an error if any required fields are missing or invalid.

This method verifies that the configuration has valid worker counts, concurrency limits, and a supported driver type. It logs any validation errors found.

Parameters:

  • logger: Logger to record validation errors

Returns:

  • nil if the configuration is valid
  • an error describing the validation failure

func (Config) WithConcurrencyLimit

func (c Config) WithConcurrencyLimit(limit int) Config

WithConcurrencyLimit sets the maximum number of jobs that can be processed simultaneously.

This controls the total number of jobs that can be in-process at once, which may be different from the number of workers. This is useful for rate limiting and preventing system overload.

Parameters:

  • limit: Maximum number of concurrent jobs

Returns:

  • Updated Config with the concurrency limit setting

func (Config) WithDLQAdapter

func (c Config) WithDLQAdapter(adapter dlq.DLQAdapter) Config

WithDLQAdapter sets the Dead Letter Queue adapter for handling failed jobs.

A Dead Letter Queue (DLQ) is used to store jobs that have failed after exceeding their retry attempts. This allows for later analysis, debugging, or manual reprocessing.

Parameters:

  • adapter: An implementation of the dlq.DLQAdapter interface

Returns:

  • Updated Config with the DLQ adapter set

func (Config) WithExponentialBackoff

func (c Config) WithExponentialBackoff(enabled bool) Config

WithExponentialBackoff enables or disables exponential increase of retry delays.

When enabled, the delay between retry attempts will increase exponentially based on the retry count. This helps to prevent overwhelming the system with retry attempts when there are persistent issues.

Parameters:

  • enabled: Whether to use exponential backoff for retries

Returns:

  • Updated Config with the exponential backoff setting

func (Config) WithJobTimeout

func (c Config) WithJobTimeout(timeout time.Duration) Config

WithJobTimeout sets the default timeout for job execution.

This sets the maximum duration that a job can run before it's considered timed out. Individual jobs can override this timeout by setting their own timeout in the job context.

Parameters:

  • timeout: The maximum duration for job execution

Returns:

  • Updated Config with the job timeout set

func (Config) WithMaxRetryAttempts

func (c Config) WithMaxRetryAttempts(attempts int) Config

WithMaxRetryAttempts sets the number of times to retry a failed job.

When a job fails with an error, it will be retried up to this many times before being considered permanently failed and potentially sent to the DLQ. Set to 0 to disable retries.

Parameters:

  • attempts: Maximum number of retry attempts

Returns:

  • Updated Config with the max retry attempts setting

func (Config) WithMaxWorkers

func (c Config) WithMaxWorkers(maxWorkers int) Config

WithMaxWorkers sets the maximum number of concurrent worker goroutines.

This controls how many worker goroutines will be spawned to process jobs. Each worker can process one job at a time. The optimal number depends on your workload and available resources.

Parameters:

  • maxWorkers: Maximum number of worker goroutines to spawn

Returns:

  • Updated Config with the max workers setting

func (Config) WithMetricsCallback

func (c Config) WithMetricsCallback(callback MetricsCallback) Config

WithMetricsCallback sets the callback function for job completion metrics.

This function will be called after every job completes, with metrics including job ID, duration, error status, and timestamps. This is useful for monitoring, alerting, and performance tracking.

Parameters:

  • callback: Function to call with job metrics on completion

Returns:

  • Updated Config with the metrics callback set

func (Config) WithMiddleware

func (c Config) WithMiddleware(m middleware.Middleware) Config

WithMiddleware adds a middleware to the processing chain. Middlewares are executed in the order they are added.

Middleware allows you to add cross-cutting functionality to job processing, such as logging, metrics collection, validation, or rate limiting.

Parameters:

  • m: The middleware function to add

Returns:

  • Updated Config with the middleware added

func (Config) WithMiddlewares

func (c Config) WithMiddlewares(middlewares ...middleware.Middleware) Config

WithMiddlewares adds multiple middlewares to the processing chain. Middlewares are executed in the order they are added.

This is a convenience method for adding multiple middlewares at once.

Parameters:

  • middlewares: One or more middleware functions to add

Returns:

  • Updated Config with all the middlewares added

func (Config) WithRetryDelay

func (c Config) WithRetryDelay(delay time.Duration) Config

WithRetryDelay sets the base delay between retry attempts.

This is the initial delay between retry attempts. If exponential backoff is enabled, this delay will increase with each retry attempt.

Parameters:

  • delay: Base time to wait between retry attempts

Returns:

  • Updated Config with the retry delay setting

func (Config) WithStats

func (c Config) WithStats(enabled bool) Config

WithStats enables or disables queue statistics collection.

When enabled, the queue will collect metrics about job processing rates, queue sizes, and health indicators. This is useful for monitoring and debugging, but has a small performance cost.

Parameters:

  • enabled: Whether to enable statistics collection

Returns:

  • Updated Config with statistics collection setting

type DatabaseConfig

type DatabaseConfig struct {
	ConnectionString string
	DatabaseType     string // "postgres", "mysql".

	// Migration options
	AutoMigrate bool
	// Connection pool settings
	MaxOpenConns    int
	MaxIdleConns    int
	ConnMaxLifetime time.Duration
}

func (DatabaseConfig) Type

func (c DatabaseConfig) Type() string

type DriverConfig

type DriverConfig interface {
	// Type returns the driver type identifier (e.g., "redis", "memory")
	Type() string
}

DriverConfig defines the interface that all driver-specific configurations must implement.

type JobMetrics

type JobMetrics struct {
	// QueueName is the name of the queue the job was processed in
	QueueName string
	// JobID is the unique identifier of the processed job
	JobID string
	// Duration is how long the job took to process
	Duration time.Duration
	// Error contains any error that occurred during processing
	Error error
	// Timestamp is when the metrics were collected
	Timestamp time.Time
}

JobMetrics contains metrics data for a completed job. This is passed to the MetricsCallback when a job completes processing.

type MetricsCallback

type MetricsCallback func(metrics JobMetrics)

MetricsCallback is a function type for handling job completion metrics. It is called after a job completes processing, whether successful or not.

type RedisConfig

type RedisConfig struct {
	// Addr is the Redis server address (e.g., "localhost:6379")
	Addr string
	// Username is the Redis server username (optional)
	Username string
	// Password is the Redis server password (optional)
	Password string
	// Db is the Redis database number to use
	Db int
}

RedisConfig contains configuration options specific to the Redis driver.

func (RedisConfig) Type

func (r RedisConfig) Type() string

Type implements the DriverConfig interface.

type SQSConfig

type SQSConfig struct {
	// QueueURL is the URL of the SQS queue
	QueueURL string
	// Region is the AWS region where the SQS queue is located
	Region string
	// AccessKeyID is the AWS access key ID for authentication
	AccessKeyID string
	// SecretAccessKey is the AWS secret access key for authentication
	SecretAccessKey string
	// MaxMessages is the maximum number of messages to retrieve in a single batch (1-10)
	MaxMessages int
	// VisibilityTimeout is the duration that messages are hidden from subsequent retrieve requests
	VisibilityTimeout time.Duration
	// IsFifo indicates whether the SQS queue is a FIFO queue
	IsFifo bool
	// MessageGroupID is the FIFO queue message group ID (required for FIFO queues)
	// If not specified, a default value of "default" will be used for FIFO queues
	MessageGroupID string
	// MessageDeduplicationID is used for FIFO queues to prevent duplicate messages (optional)
	// If not specified, a unique ID will be generated automatically for each message
	MessageDeduplicationID string
}

SQSConfig contains configuration options specific to the AWS SQS driver.

func (SQSConfig) Type

func (s SQSConfig) Type() string

Type implements the DriverConfig interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL