worker

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2018 License: MIT Imports: 8 Imported by: 7

README

Worker

GoDoc Build Status Coverage Status Go Report Card codebeat badge

Package worker adding the abstraction layer around background jobs, allows make a job periodically, observe execution time and to control concurrent execution.

Group of workers allows to control jobs start time and wait until all runned workers finished when we need stop all jobs.

Features

  • Scheduling, use one from existing worker.By* schedule functions. Supporting cron schedule spec format by robfig/cron parser.
  • Control concurrent execution around multiple instances by worker.With* lock functions. Supporting redis locks by go-redis/redis and bsm/redis-lock pkgs.
  • Observe a job execution time duration with worker.SetObserever. Friendly for prometheus/client_golang package.
  • Graceful stop, wait until all running jobs was completed.

Example

wg := worker.NewGroup()
wg.Add(
    worker.
        New(func(context.Context) {}).
        ByTicker(time.Second),

    worker.
        New(func(context.Context) {}).
        ByTimer(time.Second),

    worker.
        New(func(context.Context) {}).
        ByCronSpec("@every 1s"),
)
wg.Run()

See more examples here

Documentation

Overview

Package worker adding the abstraction layer around background jobs, allows make a job periodically, observe execution time and to control concurrent execution. Group of workers allows to control jobs start time and wait until all runned workers finished when we need stop all jobs.

Usage

Create group and workers with empty job:

wg := worker.NewGroup()
w1 := worker.New(func(context.Context) {})
w2 := worker.New(func(context.Context) {})
w3 := worker.New(func(context.Context) {})

Add workers to group and run all jobs:

wg.Add(w1, w2, w3)
wg.Run()

Stop all workers:

wg.Stop()

Periodic jobs

Set job execution period to worker (only the last will be applied)

w := worker.New(func(context.Context) {})
w.ByTicker(time.Second)
w.ByTimer(time.Second)
w.ByCronSpec("@every 1s")

or set custom schedule function

// run 3 times
w.BySchedule(func(ctx context.Context, j worker.Job) worker.Job {
	return func(ctx context.Context) {
		for i := 0; i < 3; i++ {
			j(ctx)
		}
	}
})

Exclusive jobs

Control concurrent execution around single or multiple instances by redis locks

worker.
	New(func(context.Context) {}).
	WithRedisLock(&worker.RedisLockOptions{}).
	Run(context.Background())

or set custom locker

w.WithLock(worker.Locker)

Observe execution time

Collect job execution time metrics

w.SetObserver(func(d float64) {
	fmt.Printf("time elapsed %.3fs", d)
})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BsmRedisLockOptions

type BsmRedisLockOptions struct {
	RedisLockOptions
	RetryCount int
	RetryDelay time.Duration
}

BsmRedisLockOptions lock options for bsm/redis-lock locker

func (BsmRedisLockOptions) NewWith

func (opts BsmRedisLockOptions) NewWith(
	lockkey string,
	lockttl time.Duration,
	retries int,
	retryDelay time.Duration,
) BsmRedisLockOptions

NewWith returns new bsm redis lock options from target options

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group of workers controlling background jobs execution allows graceful stop all running background jobs

func NewGroup

func NewGroup() *Group

NewGroup yield new workers group

func (*Group) Add

func (g *Group) Add(workers ...*Worker)

Add workers to group, if group runned then start worker immediately

func (*Group) Run

func (g *Group) Run()

Run starting each worker in separate goroutine with wait.Group control

func (*Group) Stop

func (g *Group) Stop()

Stop cancel workers context and wait until all runned workers was completed. Be careful! It can be deadlock if some worker hanging

type Job

type Job func(context.Context)

Job is target background job

type LockFunc

type LockFunc func(context.Context, Job) Job

LockFunc is job wrapper for control exclusive execution

func WithBsmRedisLock

func WithBsmRedisLock(opts BsmRedisLockOptions) LockFunc

WithBsmRedisLock returns job wrapper func with redis lock by bsm/redis-lock pkg

func WithLock

func WithLock(l Locker) LockFunc

WithLock returns func with call Worker in lock

func WithRedisLock

func WithRedisLock(opts RedisLockOptions) LockFunc

WithRedisLock returns job wrapper func with redis lock

type Locker

type Locker interface {
	// Lock acquire lock for job, returns error when the job should not be started
	Lock() error
	// Unlock release acquired lock
	Unlock()
}

Locker interface

type ObserveFunc

type ObserveFunc func(float64)

ObserveFunc given execution job time duration seconds

type RedisClient

type RedisClient interface {
	SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(script string) *redis.StringCmd
	Del(keys ...string) *redis.IntCmd
}

RedisClient interface define all used functions from github.com/go-redis/redis

type RedisLockLogger

type RedisLockLogger interface {
	Errorw(msg string, keysAndValues ...interface{})
	Warnw(msg string, keysAndValues ...interface{})
}

RedisLockLogger all needed logger funcs

type RedisLockOptions

type RedisLockOptions struct {
	LockKey  string
	LockTTL  time.Duration
	RedisCLI RedisClient
	Logger   RedisLockLogger
}

RedisLockOptions describe redis lock settings

func (*RedisLockOptions) GetLogger

func (opts *RedisLockOptions) GetLogger() RedisLockLogger

GetLogger return logger from options or zap.Noop logger

func (RedisLockOptions) NewWith

func (opts RedisLockOptions) NewWith(lockkey string, lockttl time.Duration) RedisLockOptions

NewWith returns new redis lock options with given key name and ttl from source options

type ScheduleFunc

type ScheduleFunc func(context.Context, Job) Job

ScheduleFunc is job wrapper for implement job run schedule

func ByCronSchedule

func ByCronSchedule(schedule string) ScheduleFunc

ByCronSchedule returns job wrapper func for run job by cron schedule using robfig/cron parser for parse cron spec. If schedule spec not valid throw panic, shit happens.

func ByTicker

func ByTicker(period time.Duration) ScheduleFunc

ByTicker returns func which run Worker by ticker each period duration

func ByTimer

func ByTimer(period time.Duration) ScheduleFunc

ByTimer returns job wrapper func for run job each period duration after previous run completed

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is builder for job with optional schedule and exclusive control

func New

func New(job Job) *Worker

New returns new worker with target job

func (*Worker) ByCronSpec

func (w *Worker) ByCronSpec(spec string) *Worker

ByCronSpec set schedule job wrapper by cron spec

func (*Worker) BySchedule

func (w *Worker) BySchedule(s ScheduleFunc) *Worker

BySchedule set schedule wrapper func for job

func (*Worker) ByTicker

func (w *Worker) ByTicker(period time.Duration) *Worker

ByTicker set schedule ticker job wrapper with period

func (*Worker) ByTimer

func (w *Worker) ByTimer(period time.Duration) *Worker

ByTimer set schedule timer job wrapper with period

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run job, wrap job to metrics, lock and schedule wrappers

func (*Worker) SetImmediately

func (w *Worker) SetImmediately(executeOnRun bool) *Worker

SetImmediately set execute job on Run setting

func (*Worker) SetObserver

func (w *Worker) SetObserver(observe ObserveFunc) *Worker

SetObserver set job duration observer

func (*Worker) WithBsmRedisLock

func (w *Worker) WithBsmRedisLock(opts BsmRedisLockOptions) *Worker

WithBsmRedisLock set job lock wrapper using bsm/redis-lock pkg

func (*Worker) WithLock

func (w *Worker) WithLock(l Locker) *Worker

WithLock set job lock wrapper

func (*Worker) WithRedisLock

func (w *Worker) WithRedisLock(opts RedisLockOptions) *Worker

WithRedisLock set job lock wrapper using redis lock

Directories

Path Synopsis
examples
custom command
metrics command
redis command
!require redis on localhost:6379 docker run -p6379:6379 -d redis
!require redis on localhost:6379 docker run -p6379:6379 -d redis
locker module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL