uwe

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2019 License: Apache-2.0 Imports: 11 Imported by: 0

README

uwe

United Workers Environment

WARN: current state is unsatable and not work. Use routines from Armory upstream


Worker

Worker is an interface for async workers which launches and manages by the Chief.

The main ideas of the Worker:

  • this is simple and lightweight worker;
  • it can communicate with surroundings through channels, message queues, etc;
  • worker must do one or few small jobs;
  • should be able to run the worker as one independent process;

For example microservice for the image storing can have three workers:

  1. Rest API - receives and gives out images from user;
  2. Image resizer - compresses and resize the image for the thumbnails;
  3. Uploader - uploads files to the S3 storage.

All this workers are part of one microservice, in one binary and able to run them as a one process or as a three different.

Method list:
  • Init(context.Context) Worker - initializes new instance of the Worker implementation.
  • Run() - starts the Worker instance execution. This should be a blocking call, which in normal mode will be executed in goroutine.

Chief

Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type Worker.

Method list:
  • AddWorker(name string, worker Worker - register a new Worker to the Chief worker pool.
  • EnableWorkers(names ...string) - enables all worker from the names list. By default, all added workers are enabled.
  • EnableWorker(name string) - enables the worker with the specified name. By default, all added workers are enabled.
  • IsEnabled(name string) bool - checks is enable worker with passed name.
  • InitWorkers(logger *logrus.Entry) - initializes all registered workers.
  • Start(parentCtx context.Context) - runs all registered workers, locks until the parentCtx closes, and then gracefully stops all workers.

In InitWorkers Chief insert in the context his logger (*logrus.Entry), so at the worker you can took by key routines.CtxKeyLog and use it.

// .....
func (w *MyWorker) Init(parentCtx context.Context) routines.Worker {
    logger, ok := parentCtx.Value(routines.CtxKeyLog).(*logrus.Entry)
    if !ok {
        // process error
    }
    // add field with worker name.
    w.Logger = logger.WithField("worker", "my-cool-worker")
    
    // ... do other stuff ... //
    
    return w
}
// .....

Usage

Just define the routines.Chief variable, register your worker using the AddWorker method. Before starting, you must initialize registered workers using the InitWorkers(*logrus.Entry) method.

A very simple example:

package main

import (
    "github.com/lancer-kit/armory/routines"
    "context"
)

var WorkersChief routines.Chief

func init()  {
    WorkersChief = routines.Chief{}
    WorkersChief.AddWorker("my-awesome-worker", &MyWorker{})
    // `MyWorker` is a type which implement `Worker` interface.
}

func main () {
    WorkersChief.InitWorkers(nil)
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        WorkersChief.Start(ctx)
    }()
    
    defer func() {
        cancel()
    }()
}

Documentation

Index

Constants

View Source
const (
	WStateDisabled    sam.State = "Disabled"
	WStateEnabled     sam.State = "Enabled"
	WStateInitialized sam.State = "Initialized"
	WStateRun         sam.State = "Run"
	WStateStopped     sam.State = "Stopped"
	WStateFailed      sam.State = "Failed"
)

Variables

View Source
var (
	ErrWorkerNotExist = func(name WorkerName) error {
		return fmt.Errorf("%s: not exist", name)
	}
)
View Source
var ForceStopTimeout = 45 * time.Second

ForceStopTimeout is a timeout for killing all workers.

View Source
var WorkersStates = map[sam.State]struct{}{
	WStateDisabled:    {},
	WStateEnabled:     {},
	WStateInitialized: {},
	WStateRun:         {},
	WStateStopped:     {},
	WStateFailed:      {},
}

WorkersStates list of valid workers states

Functions

This section is empty.

Types

type Chief

type Chief struct {

	// EnableByDefault sets all the working `Enabled`
	// if none of the workers is passed on to enable.
	EnableByDefault bool
	// AppName main app identifier of instance for logger and etc.
	AppName string
	// contains filtered or unexported fields
}

Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type `Worker`.

func NewChief

func NewChief(name string, enableByDefault bool, logger *logrus.Entry) *Chief

NewChief creates and initialize new instance of `Chief`

func (*Chief) AddValueToContext

func (chief *Chief) AddValueToContext(key, value interface{})

func (*Chief) AddWorker

func (chief *Chief) AddWorker(name WorkerName, worker Worker)

AddWorker register a new `Worker` to the `Chief` worker pool.

func (*Chief) EnableWorker

func (chief *Chief) EnableWorker(name WorkerName) error

EnableWorker enables the worker with the specified `name`. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active

func (*Chief) EnableWorkers

func (chief *Chief) EnableWorkers(names ...WorkerName) (err error)

EnableWorkers enables all worker from the `names` list. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active

func (*Chief) GetContext

func (chief *Chief) GetContext() context.Context

func (*Chief) GetWorkersStates

func (chief *Chief) GetWorkersStates() map[WorkerName]sam.State

func (*Chief) Init

func (chief *Chief) Init(logger *logrus.Entry) *Chief

Init initializes all internal states properly.

func (*Chief) IsEnabled

func (chief *Chief) IsEnabled(name WorkerName) bool

IsEnabled checks is enable worker with passed `name`.

func (*Chief) Run

func (chief *Chief) Run(workers ...WorkerName) error

Run enables passed workers, starts worker pool and lock context until it intercepts `syscall.SIGTERM`, `syscall.SIGINT`. NOTE: Use this method ONLY as a top-level action.

func (*Chief) RunWithContext

func (chief *Chief) RunWithContext(ctx context.Context, workers ...WorkerName) error

func (*Chief) RunWithLocker

func (chief *Chief) RunWithLocker(locker func(), workers ...WorkerName) (err error)

RunWithLocker `locker` function should block the execution context and wait for some signal to stop.

func (*Chief) StartPool

func (chief *Chief) StartPool(parentCtx context.Context) int

StartPool runs all registered workers, locks until the `parentCtx` closes, and then gracefully stops all workers. Returns result code:

-1 — start failed
 0 — stopped properly

type CtxKey

type CtxKey string

CtxKey is the type of context keys for the values placed by`Chief`.

const (
	// CtxKeyLog is a context key for a `*logrus.Entry` value.
	CtxKeyLog CtxKey = "chief-log"
)

type ExitCode

type ExitCode int
const (
	// ExitCodeOk means that the worker is stopped.
	ExitCodeOk ExitCode = iota
	// ExitCodeInterrupted means that the work cycle has been interrupted and can be restarted.
	ExitCodeInterrupted
	// ExitCodeFailed means that the worker fails.
	ExitCodeFailed
	// ExitNeedReInit means that the worker can't do job and requires reinitialization.
	ExitReinitReq
)

type Message

type Message struct {
	UID    int64
	Target WorkerName
	Sender WorkerName
	Data   interface{}
}

type WContext

type WContext interface {
	context.Context
	SendMessage(target WorkerName, data interface{}) error
	MessageBus() <-chan *Message
}

func NewContext

func NewContext(name WorkerName, ctx context.Context, in, out chan *Message) WContext

type Worker

type Worker interface {
	// Init initializes new instance of the `Worker` implementation,
	// this context should be used only as Key/Value transmitter,
	// DO NOT use it for `<- ctx.Done()`
	Init(ctx context.Context) Worker
	// RestartOnFail determines the need to restart the worker, if it stopped.
	RestartOnFail() bool
	// Run starts the `Worker` instance execution.
	Run(ctx WContext) ExitCode
}

Worker is an interface for async workers which launches and manages by the `Chief`.

type WorkerExistRule

type WorkerExistRule struct {
	AvailableWorkers map[WorkerName]struct{}
	// contains filtered or unexported fields
}

func (*WorkerExistRule) Error

func (r *WorkerExistRule) Error(message string) *WorkerExistRule

Error sets the error message for the rule.

func (*WorkerExistRule) Validate

func (r *WorkerExistRule) Validate(value interface{}) error

Validate checks that service exist on the system

type WorkerName

type WorkerName string

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is

func (*WorkerPool) DisableWorker

func (pool *WorkerPool) DisableWorker(name WorkerName) error

DisableWorker sets state `WorkerDisabled` for workers with the specified `name`.

func (*WorkerPool) EnableWorker

func (pool *WorkerPool) EnableWorker(name WorkerName) error

EnableWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) FailWorker

func (pool *WorkerPool) FailWorker(name WorkerName) error

FailWorker sets state `WorkerFailed` for workers with the specified `name`.

func (*WorkerPool) GetState

func (pool *WorkerPool) GetState(name WorkerName) sam.State

GetState returns current state for workers with the specified `name`.

func (*WorkerPool) GetWorkersStates

func (pool *WorkerPool) GetWorkersStates() map[WorkerName]sam.State

GetWorkersStates returns current state of all workers.

func (*WorkerPool) InitWorker

func (pool *WorkerPool) InitWorker(name WorkerName, ctx context.Context) error

InitWorker initializes all present workers.

func (*WorkerPool) IsDisabled

func (pool *WorkerPool) IsDisabled(name WorkerName) bool

IsEnabled checks is disabled worker with passed `name`.

func (*WorkerPool) IsEnabled

func (pool *WorkerPool) IsEnabled(name WorkerName) bool

IsEnabled checks is enabled worker with passed `name`.

func (*WorkerPool) IsRun

func (pool *WorkerPool) IsRun(name WorkerName) bool

IsRun checks is active worker with passed `name`.

func (*WorkerPool) ReplaceWorker

func (pool *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)

func (*WorkerPool) RunWorkerExec

func (pool *WorkerPool) RunWorkerExec(name WorkerName, ctx WContext) (err error)

RunWorkerExec adds worker into pool.

func (*WorkerPool) SetState

func (pool *WorkerPool) SetState(name WorkerName, state sam.State) error

SetState updates state of specified worker.

func (*WorkerPool) SetWorker

func (pool *WorkerPool) SetWorker(name WorkerName, worker Worker)

SetWorker adds worker into pool.

func (*WorkerPool) StartWorker

func (pool *WorkerPool) StartWorker(name WorkerName) error

StartWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) StopWorker

func (pool *WorkerPool) StopWorker(name WorkerName) error

StopWorker sets state `WorkerStopped` for workers with the specified `name`.

Directories

Path Synopsis
libs
clicheck module
cronjob module
logrus-hook module
logrushook module
zerolog-hook module
zerologhook module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL