Documentation
¶
Overview ¶
Package worker adding the abstraction layer around background jobs, allows make a job periodically, observe execution time and to control concurrent execution. Group of workers allows to control jobs start time and wait until all runned workers finished when we need stop all jobs.
Usage ¶
Create group and workers with empty job:
wg := worker.NewGroup()
w1 := worker.New(func(context.Context) {})
w2 := worker.New(func(context.Context) {})
w3 := worker.New(func(context.Context) {})
Add workers to group and run all jobs:
wg.Add(w1, w2, w3) wg.Run()
Stop all workers:
wg.Stop()
Periodic jobs ¶
Set job execution period to worker (only the last will be applied)
w := worker.New(func(context.Context) {})
w.ByTicker(time.Second)
w.ByTimer(time.Second)
w.ByCronSpec("@every 1s")
or set custom schedule function
// run 3 times
w.BySchedule(func(ctx context.Context, j worker.Job) worker.Job {
return func(ctx context.Context) {
for i := 0; i < 3; i++ {
j(ctx)
}
}
})
Exclusive jobs ¶
Control concurrent execution around single or multiple instances by redis locks
worker.
New(func(context.Context) {}).
WithRedisLock(&worker.RedisLockOptions{}).
Run(context.Background())
or set custom locker
w.WithLock(worker.Locker)
Observe execution time ¶
Collect job execution time metrics
w.SetObserver(func(d float64) {
fmt.Printf("time elapsed %.3fs", d)
})
Index ¶
- type BsmRedisLockOptions
- type Group
- type Job
- type LockFunc
- type Locker
- type ObserveFunc
- type RedisClient
- type RedisLockLogger
- type RedisLockOptions
- type ScheduleFunc
- type Worker
- func (w *Worker) ByCronSpec(spec string) *Worker
- func (w *Worker) BySchedule(s ScheduleFunc) *Worker
- func (w *Worker) ByTicker(period time.Duration) *Worker
- func (w *Worker) ByTimer(period time.Duration) *Worker
- func (w *Worker) Run(ctx context.Context)
- func (w *Worker) SetImmediately(executeOnRun bool) *Worker
- func (w *Worker) SetObserver(observe ObserveFunc) *Worker
- func (w *Worker) WithBsmRedisLock(opts BsmRedisLockOptions) *Worker
- func (w *Worker) WithLock(l Locker) *Worker
- func (w *Worker) WithRedisLock(opts RedisLockOptions) *Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BsmRedisLockOptions ¶
type BsmRedisLockOptions struct {
RedisLockOptions
RetryCount int
RetryDelay time.Duration
}
BsmRedisLockOptions lock options for bsm/redis-lock locker
func (BsmRedisLockOptions) NewWith ¶
func (opts BsmRedisLockOptions) NewWith( lockkey string, lockttl time.Duration, retries int, retryDelay time.Duration, ) BsmRedisLockOptions
NewWith returns new bsm redis lock options from target options
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group of workers controlling background jobs execution allows graceful stop all running background jobs
type LockFunc ¶
LockFunc is job wrapper for control exclusive execution
func WithBsmRedisLock ¶
func WithBsmRedisLock(opts BsmRedisLockOptions) LockFunc
WithBsmRedisLock returns job wrapper func with redis lock by bsm/redis-lock pkg
func WithRedisLock ¶
func WithRedisLock(opts RedisLockOptions) LockFunc
WithRedisLock returns job wrapper func with redis lock
type Locker ¶
type Locker interface {
// Lock acquire lock for job, returns error when the job should not be started
Lock() error
// Unlock release acquired lock
Unlock()
}
Locker interface
type ObserveFunc ¶
type ObserveFunc func(float64)
ObserveFunc given execution job time duration seconds
type RedisClient ¶
type RedisClient interface {
SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
Eval(script string, keys []string, args ...interface{}) *redis.Cmd
EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
ScriptExists(scripts ...string) *redis.BoolSliceCmd
ScriptLoad(script string) *redis.StringCmd
Del(keys ...string) *redis.IntCmd
}
RedisClient interface define all used functions from github.com/go-redis/redis
type RedisLockLogger ¶
type RedisLockLogger interface {
Errorw(msg string, keysAndValues ...interface{})
Warnw(msg string, keysAndValues ...interface{})
}
RedisLockLogger all needed logger funcs
type RedisLockOptions ¶
type RedisLockOptions struct {
LockKey string
LockTTL time.Duration
RedisCLI RedisClient
Logger RedisLockLogger
}
RedisLockOptions describe redis lock settings
func (*RedisLockOptions) GetLogger ¶
func (opts *RedisLockOptions) GetLogger() RedisLockLogger
GetLogger return logger from options or zap.Noop logger
func (RedisLockOptions) NewWith ¶
func (opts RedisLockOptions) NewWith(lockkey string, lockttl time.Duration) RedisLockOptions
NewWith returns new redis lock options with given key name and ttl from source options
type ScheduleFunc ¶
ScheduleFunc is job wrapper for implement job run schedule
func ByCronSchedule ¶
func ByCronSchedule(schedule string) ScheduleFunc
ByCronSchedule returns job wrapper func for run job by cron schedule using robfig/cron parser for parse cron spec. If schedule spec not valid throw panic, shit happens.
func ByTicker ¶
func ByTicker(period time.Duration) ScheduleFunc
ByTicker returns func which run Worker by ticker each period duration
func ByTimer ¶
func ByTimer(period time.Duration) ScheduleFunc
ByTimer returns job wrapper func for run job each period duration after previous run completed
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker is builder for job with optional schedule and exclusive control
func (*Worker) ByCronSpec ¶
ByCronSpec set schedule job wrapper by cron spec
func (*Worker) BySchedule ¶
func (w *Worker) BySchedule(s ScheduleFunc) *Worker
BySchedule set schedule wrapper func for job
func (*Worker) SetImmediately ¶
SetImmediately set execute job on Run setting
func (*Worker) SetObserver ¶
func (w *Worker) SetObserver(observe ObserveFunc) *Worker
SetObserver set job duration observer
func (*Worker) WithBsmRedisLock ¶
func (w *Worker) WithBsmRedisLock(opts BsmRedisLockOptions) *Worker
WithBsmRedisLock set job lock wrapper using bsm/redis-lock pkg
func (*Worker) WithRedisLock ¶
func (w *Worker) WithRedisLock(opts RedisLockOptions) *Worker
WithRedisLock set job lock wrapper using redis lock
