routines

package
v1.0.1-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2019 License: Apache-2.0 Imports: 14 Imported by: 0

README

routines

This is the package for implementing the worker pool pattern.

Worker

Worker is an interface for async workers which launches and manages by the Chief.

The main ideas of the Worker:

  • this is simple and lightweight worker;
  • it can communicate with surroundings through channels, message queues, etc;
  • worker must do one or few small jobs;
  • should be able to run the worker as one independent process;

For example microservice for the image storing can have three workers:

  1. Rest API - receives and gives out images from user;
  2. Image resizer - compresses and resize the image for the thumbnails;
  3. Uploader - uploads files to the S3 storage.

All this workers are part of one microservice, in one binary and able to run them as a one process or as a three different.

Method list:
  • Init(context.Context) Worker - initializes new instance of the Worker implementation.
  • Run() - starts the Worker instance execution. This should be a blocking call, which in normal mode will be executed in goroutine.

Chief

Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type Worker.

Method list:
  • AddWorker(name string, worker Worker - register a new Worker to the Chief worker pool.
  • EnableWorkers(names ...string) - enables all worker from the names list. By default, all added workers are enabled.
  • EnableWorker(name string) - enables the worker with the specified name. By default, all added workers are enabled.
  • IsEnabled(name string) bool - checks is enable worker with passed name.
  • InitWorkers(logger *logrus.Entry) - initializes all registered workers.
  • Start(parentCtx context.Context) - runs all registered workers, locks until the parentCtx closes, and then gracefully stops all workers.

In InitWorkers Chief insert in the context his logger (*logrus.Entry), so at the worker you can took by key routines.CtxKeyLog and use it.

// .....
func (w *MyWorker) Init(parentCtx context.Context) routines.Worker {
    logger, ok := parentCtx.Value(routines.CtxKeyLog).(*logrus.Entry)
    if !ok {
        // process error
    }
    // add field with worker name.
    w.Logger = logger.WithField("worker", "my-cool-worker")
    
    // ... do other stuff ... //
    
    return w
}
// .....

Usage

Just define the routines.Chief variable, register your worker using the AddWorker method. Before starting, you must initialize registered workers using the InitWorkers(*logrus.Entry) method.

A very simple example:

package main

import (
    "github.com/lancer-kit/armory/routines"
    "context"
)

var WorkersChief routines.Chief

func init()  {
    WorkersChief = routines.Chief{}
    WorkersChief.AddWorker("my-awesome-worker", &MyWorker{})
    // `MyWorker` is a type which implement `Worker` interface.
}

func main () {
    WorkersChief.InitWorkers(nil)
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        WorkersChief.Start(ctx)
    }()
    
    defer func() {
        cancel()
    }()
}

Documentation

Overview

generated by forge enum --type WorkerState; DO NOT EDIT

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkerNotInitialized = errors.New("worker not initialized")
)
View Source
var ErrWorkerStateInvalid = errors.New("WorkerState is invalid")
View Source
var ForceStopTimeout = 45 * time.Second

ForceStopTimeout is a timeout for killing all workers.

Functions

This section is empty.

Types

type Chief

type Chief struct {

	// EnableByDefault sets all the working `Enabled`
	// if none of the workers is passed on to enable.
	EnableByDefault bool
	// contains filtered or unexported fields
}

Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type `Worker`.

func (*Chief) AddValueToContext

func (chief *Chief) AddValueToContext(key, value interface{})

func (*Chief) AddWorker

func (chief *Chief) AddWorker(name string, worker Worker)

AddWorker register a new `Worker` to the `Chief` worker pool.

func (*Chief) EnableWorker

func (chief *Chief) EnableWorker(name string)

EnableWorker enables the worker with the specified `name`. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active

func (*Chief) EnableWorkers

func (chief *Chief) EnableWorkers(names ...string)

EnableWorkers enables all worker from the `names` list. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active

func (*Chief) GetContext

func (chief *Chief) GetContext() context.Context

func (*Chief) GetWorkersStates

func (chief *Chief) GetWorkersStates() map[string]WorkerState

func (*Chief) InitWorkers

func (chief *Chief) InitWorkers(logger *logrus.Entry)

InitWorkers initializes all registered workers.

func (*Chief) IsEnabled

func (chief *Chief) IsEnabled(name string) bool

IsEnabled checks is enable worker with passed `name`.

func (*Chief) RunAll

func (chief *Chief) RunAll(appName string, workers ...string) error

RunAll start worker pool and lock context until it intercepts `syscall.SIGTERM`, `syscall.SIGINT`. NOTE: Use this method ONLY as a top-level action.

func (*Chief) Start

func (chief *Chief) Start(parentCtx context.Context)

Start runs all registered workers, locks until the `parentCtx` closes, and then gracefully stops all workers.

type CtxKey

type CtxKey string

CtxKey is the type of context keys for the values placed by`Chief`.

const (
	// CtxKeyLog is a context key for a `*logrus.Entry` value.
	CtxKeyLog CtxKey = "chief-log"
)

type Worker

type Worker interface {
	// Init initializes new instance of the `Worker` implementation.
	Init(context.Context) Worker
	// RestartOnFail determines the need to restart the worker, if it stopped.
	RestartOnFail() bool
	// Run starts the `Worker` instance execution.
	Run() //todo(mike): add result or error
}

Worker is an interface for async workers which launches and manages by the `Chief`.

type WorkerExistRule

type WorkerExistRule struct {
	AvailableWorkers map[string]struct{}
	// contains filtered or unexported fields
}

func (*WorkerExistRule) Error

func (r *WorkerExistRule) Error(message string) *WorkerExistRule

Error sets the error message for the rule.

func (*WorkerExistRule) Validate

func (r *WorkerExistRule) Validate(value interface{}) error

Validate checks that service exist on the system

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is

func (*WorkerPool) DisableWorker

func (pool *WorkerPool) DisableWorker(name string)

DisableWorker sets state `WorkerDisabled` for workers with the specified `name`.

func (*WorkerPool) EnableWorker

func (pool *WorkerPool) EnableWorker(name string)

EnableWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) FailWorker

func (pool *WorkerPool) FailWorker(name string)

FailWorker sets state `WorkerFailed` for workers with the specified `name`.

func (*WorkerPool) GetState

func (pool *WorkerPool) GetState(name string) WorkerState

GetState - get Worker state by name

func (*WorkerPool) GetWorker

func (pool *WorkerPool) GetWorker(name string) Worker

GetWorker - get Worker interface by name

func (*WorkerPool) GetWorkersStates

func (pool *WorkerPool) GetWorkersStates() map[string]WorkerState

func (*WorkerPool) InitWorker

func (pool *WorkerPool) InitWorker(name string, ctx context.Context)

InitWorker initializes all present workers.

func (*WorkerPool) IsAlive

func (pool *WorkerPool) IsAlive(name string) bool

IsAlive checks is active worker with passed `name`.

func (*WorkerPool) IsEnabled

func (pool *WorkerPool) IsEnabled(name string) bool

IsEnabled checks is enable worker with passed `name`.

func (*WorkerPool) RunWorkerExec

func (pool *WorkerPool) RunWorkerExec(name string) (err error)

RunWorkerExec adds worker into pool.

func (*WorkerPool) SetState

func (pool *WorkerPool) SetState(name string, state WorkerState)

SetState updates state of specified worker.

func (*WorkerPool) SetWorker

func (pool *WorkerPool) SetWorker(name string, worker Worker)

SetWorker adds worker into pool.

func (*WorkerPool) StartWorker

func (pool *WorkerPool) StartWorker(name string)

StartWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) StopWorker

func (pool *WorkerPool) StopWorker(name string)

StopWorker sets state `WorkerStopped` for workers with the specified `name`.

type WorkerState

type WorkerState int32
const (
	WorkerWrongStateChange WorkerState = -1
	WorkerNull             WorkerState = iota
	WorkerDisabled
	WorkerPresent
	WorkerEnabled
	WorkerInitialized
	WorkerRun
	WorkerStopped
	WorkerFailed
)

func (WorkerState) MarshalJSON

func (r WorkerState) MarshalJSON() ([]byte, error)

MarshalJSON is generated so WorkerState satisfies json.Marshaler.

func (*WorkerState) Scan

func (r *WorkerState) Scan(src interface{}) error

Value is generated so WorkerState satisfies db row driver.Scanner.

func (WorkerState) String

func (r WorkerState) String() string

String is generated so WorkerState satisfies fmt.Stringer.

func (*WorkerState) UnmarshalJSON

func (r *WorkerState) UnmarshalJSON(data []byte) error

UnmarshalJSON is generated so WorkerState satisfies json.Unmarshaler.

func (WorkerState) Validate

func (r WorkerState) Validate() error

Validate verifies that value is predefined for WorkerState.

func (WorkerState) Value

func (r WorkerState) Value() (driver.Value, error)

Value is generated so WorkerState satisfies db row driver.Valuer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL