Documentation
¶
Overview ¶
Neoq does not aim to be the _fastest_ background job processor. It aims to be _fast_, _reliable_, and demand a _minimal infrastructure footprint_.
Index ¶
- Constants
- Variables
- type BackendInitializer
- type Config
- type ConfigOption
- func WithBackend(initializer BackendInitializer) ConfigOption
- func WithJobCheckInterval(interval time.Duration) ConfigOption
- func WithLogLevel(level logging.LogLevel) ConfigOption
- func WithQueueListenerChanBufferSize(bufferSize int) ConfigOption
- func WithRecoveryCallback(cb handler.RecoveryCallback) ConfigOption
- type Neoq
Examples ¶
Constants ¶
const ( DefaultIdleTxTimeout = 30000 // the window of time between time.Now() and when a job's RunAfter comes due that neoq will schedule a goroutine to // schdule the job for execution. // E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine // to wait until the job's RunAfter, scheduling the job to be run exactly at RunAfter DefaultFutureJobWindow = 30 * time.Second DefaultJobCheckInterval = 1 * time.Second DefaultQueueListenerChanBufferSize = 1000 )
Variables ¶
var ErrBackendNotSpecified = errors.New("a backend must be specified")
Functions ¶
This section is empty.
Types ¶
type BackendInitializer ¶
type BackendInitializer func(ctx context.Context, opts ...ConfigOption) (backend Neoq, err error)
BackendInitializer is a function that initializes a backend
type Config ¶
type Config struct {
BackendInitializer BackendInitializer
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs
IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance)
LogLevel logging.LogLevel // the log level of the default logger
PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out
RecoveryCallback handler.RecoveryCallback // the recovery handler applied to all Handlers excuted by the associated Neoq instance
QueueListenerChanBufferSize int // buffer size for queueListenerChan
}
Config configures neoq and its backends
This configuration struct includes options for all backends. As such, some of its options are not applicable to all backends. [BackendConcurrency], for example, is only used by the redis backend. Other backends manage concurrency on a per-handler basis.
type ConfigOption ¶
type ConfigOption func(c *Config)
ConfigOption is a function that sets optional backend configuration
func WithBackend ¶
func WithBackend(initializer BackendInitializer) ConfigOption
WithBackend configures neoq to initialize a specific backend for job processing.
Neoq provides two [config.BackendInitializer] that may be used with WithBackend
- pkg/github.com/acaloiaro/neoq/backends/memory.Backend
- pkg/github.com/acaloiaro/neoq/backends/postgres.Backend
Example ¶
ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
if err != nil {
fmt.Println("initializing a new Neoq with no params should not return an error:", err)
return
}
defer nq.Shutdown(ctx)
fmt.Println("neoq initialized with memory backend")
Output: neoq initialized with memory backend
Example (Postgres) ¶
ctx := context.Background()
var pgURL string
var ok bool
if pgURL, ok = os.LookupEnv("TEST_DATABASE_URL"); !ok {
fmt.Println("Please set TEST_DATABASE_URL environment variable")
return
}
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL))
if err != nil {
fmt.Println("initializing a new Neoq with no params should not return an error:", err)
return
}
defer nq.Shutdown(ctx)
fmt.Println("neoq initialized with postgres backend")
Output: neoq initialized with postgres backend
func WithJobCheckInterval ¶
func WithJobCheckInterval(interval time.Duration) ConfigOption
WithJobCheckInterval configures the duration of time between checking for future jobs
func WithLogLevel ¶
func WithLogLevel(level logging.LogLevel) ConfigOption
WithLogLevel configures the log level for neoq's default logger. By default, log level is "INFO". if SetLogger is used, WithLogLevel has no effect on the set logger
func WithQueueListenerChanBufferSize ¶
func WithQueueListenerChanBufferSize(bufferSize int) ConfigOption
func WithRecoveryCallback ¶
func WithRecoveryCallback(cb handler.RecoveryCallback) ConfigOption
WithRecoveryCallback configures neoq with a function to be called when fatal errors occur in job Handlers.
Recovery callbacks are useful for reporting errors to error loggers and collecting error metrics
type Neoq ¶
type Neoq interface {
// Enqueue queues jobs to be executed asynchronously
Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error)
// Start starts processing jobs on the queue specified in the Handler
Start(ctx context.Context, h handler.Handler) (err error)
// StartCron starts processing jobs with the specified cron schedule and handler
//
// See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format
StartCron(ctx context.Context, cron string, h handler.Handler) (err error)
// SetLogger sets the backend logger
SetLogger(logger logging.Logger)
// Shutdown halts job processing and releases resources
Shutdown(ctx context.Context)
}
Neoq interface is Neoq's primary API
Neoq is implemented by:
- pkg/github.com/acaloiaro/neoq/backends/memory.MemBackend
- pkg/github.com/acaloiaro/neoq/backends/postgres.PgBackend
- pkg/github.com/acaloiaro/neoq/backends/redis.RedisBackend
func New ¶
func New(ctx context.Context, opts ...ConfigOption) (b Neoq, err error)
New creates a new backend instance for job processing.
By default, neoq initializes [memory.Backend] if New() is called without a backend configuration option.
Use neoq.WithBackend to initialize different backends.
For available configuration options see neoq.ConfigOption.
Example ¶
ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
if err != nil {
fmt.Println("initializing a new Neoq with no params should not return an error:", err)
return
}
defer nq.Shutdown(ctx)
fmt.Println("neoq initialized with default memory backend")
Output: neoq initialized with default memory backend
Example (Postgres) ¶
ctx := context.Background()
var pgURL string
var ok bool
if pgURL, ok = os.LookupEnv("TEST_DATABASE_URL"); !ok {
fmt.Println("Please set TEST_DATABASE_URL environment variable")
return
}
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL))
if err != nil {
fmt.Println("neoq's postgres backend failed to initialize:", err)
return
}
defer nq.Shutdown(ctx)
fmt.Println("neoq initialized with postgres backend")
Output: neoq initialized with postgres backend
Directories
¶
| Path | Synopsis |
|---|---|
|
Package backends provides concrete implementations of pkg/github.com/acaloiaro/neoq/neoq.Neoq
|
Package backends provides concrete implementations of pkg/github.com/acaloiaro/neoq/neoq.Neoq |
|
examples
|
|
|
add_future_postgres_job
command
|
|
|
add_job_with_custom_concurrency
command
|
|
|
add_job_with_deadline
command
|
|
|
add_job_with_timeout
command
|
|
|
add_periodic_jobs
command
|
|
|
add_postgres_job
command
|
|
|
add_redis_job
command
|
|
|
start_processing_jobs
command
|
|