uwe

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2019 License: Apache-2.0 Imports: 11 Imported by: 0

README

uwe

United Workers Environment


todo: update readme

Worker

Worker is an interface for async workers which launches and manages by the Chief.

The main ideas of the Worker:

  • this is simple and lightweight worker;
  • it can communicate with surroundings through channels, message queues, etc;
  • worker must do one or few small jobs;
  • should be able to run the worker as one independent process;

For example microservice for the image storing can have three workers:

  1. Rest API - receives and gives out images from user;
  2. Image resizer - compresses and resize the image for the thumbnails;
  3. Uploader - uploads files to the S3 storage.

All this workers are part of one microservice, in one binary and able to run them as a one process or as a three different.

Method list:
  • Init(context.Context) Worker - initializes new instance of the Worker implementation.
  • Run() - starts the Worker instance execution. This should be a blocking call, which in normal mode will be executed in goroutine.

Chief

Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type Worker.

Method list:
  • AddWorker(name string, worker Worker - register a new Worker to the Chief worker pool.
  • EnableWorkers(names ...string) - enables all worker from the names list. By default, all added workers are enabled.
  • EnableWorker(name string) - enables the worker with the specified name. By default, all added workers are enabled.
  • IsEnabled(name string) bool - checks is enable worker with passed name.
  • InitWorkers(logger *logrus.Entry) - initializes all registered workers.
  • Start(parentCtx context.Context) - runs all registered workers, locks until the parentCtx closes, and then gracefully stops all workers.

In InitWorkers Chief insert in the context his logger (*logrus.Entry), so at the worker you can took by key routines.CtxKeyLog and use it.

// .....
func (w *MyWorker) Init(parentCtx context.Context) routines.Worker {
    logger, ok := parentCtx.Value(routines.CtxKeyLog).(*logrus.Entry)
    if !ok {
        // process error
    }
    // add field with worker name.
    w.Logger = logger.WithField("worker", "my-cool-worker")
    
    // ... do other stuff ... //
    
    return w
}
// .....

Usage

Just define the routines.Chief variable, register your worker using the AddWorker method. Before starting, you must initialize registered workers using the InitWorkers(*logrus.Entry) method.

A very simple example:

package main

import (
    "github.com/lancer-kit/armory/routines"
    "context"
)

var WorkersChief routines.Chief

func init()  {
    WorkersChief = routines.Chief{}
    WorkersChief.AddWorker("my-awesome-worker", &MyWorker{})
    // `MyWorker` is a type which implement `Worker` interface.
}

func main () {
    WorkersChief.InitWorkers(nil)
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        WorkersChief.Start(ctx)
    }()
    
    defer func() {
        cancel()
    }()
}

Documentation

Index

Constants

View Source
const (
	WStateDisabled    sam.State = "Disabled"
	WStateEnabled     sam.State = "Enabled"
	WStateInitialized sam.State = "Initialized"
	WStateRun         sam.State = "Run"
	WStateStopped     sam.State = "Stopped"
	WStateFailed      sam.State = "Failed"
)

Variables

View Source
var (
	ErrWorkerNotExist = func(name WorkerName) error {
		return fmt.Errorf("%s: not exist", name)
	}
)
View Source
var ForceStopTimeout = 45 * time.Second

ForceStopTimeout is a timeout for killing all workers.

View Source
var WorkersStates = map[sam.State]struct{}{
	WStateDisabled:    {},
	WStateEnabled:     {},
	WStateInitialized: {},
	WStateRun:         {},
	WStateStopped:     {},
	WStateFailed:      {},
}

WorkersStates list of valid workers states

Functions

This section is empty.

Types

type Chief

type Chief struct {

	// EnableByDefault sets all the working `Enabled`
	// if none of the workers is passed on to enable.
	EnableByDefault bool
	// AppName main app identifier of instance for logger and etc.
	AppName string
	// contains filtered or unexported fields
}

Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type `Worker`.

func NewChief

func NewChief(name string, enableByDefault bool, logger *logrus.Entry) *Chief

NewChief creates and initialize new instance of `Chief`

func (*Chief) AddValueToContext

func (chief *Chief) AddValueToContext(key, value interface{})

func (*Chief) AddWorker

func (chief *Chief) AddWorker(name WorkerName, worker Worker)

AddWorker register a new `Worker` to the `Chief` worker pool.

func (*Chief) EnableWorker

func (chief *Chief) EnableWorker(name WorkerName) error

EnableWorker enables the worker with the specified `name`. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active

func (*Chief) EnableWorkers

func (chief *Chief) EnableWorkers(names ...WorkerName) (err error)

EnableWorkers enables all worker from the `names` list. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active

func (*Chief) GetContext

func (chief *Chief) GetContext() context.Context

func (*Chief) GetWorkersStates

func (chief *Chief) GetWorkersStates() map[WorkerName]sam.State

func (*Chief) Init

func (chief *Chief) Init(logger *logrus.Entry) *Chief

Init initializes all internal states properly.

func (*Chief) IsEnabled

func (chief *Chief) IsEnabled(name WorkerName) bool

IsEnabled checks is enable worker with passed `name`.

func (*Chief) Run

func (chief *Chief) Run(workers ...WorkerName) error

Run enables passed workers, starts worker pool and lock context until it intercepts `syscall.SIGTERM`, `syscall.SIGINT`. NOTE: Use this method ONLY as a top-level action.

func (*Chief) RunWithContext

func (chief *Chief) RunWithContext(ctx context.Context, workers ...WorkerName) error

func (*Chief) RunWithLocker

func (chief *Chief) RunWithLocker(locker func(), workers ...WorkerName) (err error)

RunWithLocker `locker` function should block the execution context and wait for some signal to stop.

func (*Chief) StartPool

func (chief *Chief) StartPool(parentCtx context.Context) int

StartPool runs all registered workers, locks until the `parentCtx` closes, and then gracefully stops all workers. Returns result code:

-1 — start failed
 0 — stopped properly

type CtxKey

type CtxKey string

CtxKey is the type of context keys for the values placed by`Chief`.

const (
	// CtxKeyLog is a context key for a `*logrus.Entry` value.
	CtxKeyLog CtxKey = "chief-log"
)

type ExitCode

type ExitCode int
const (
	// ExitCodeOk means that the worker is stopped.
	ExitCodeOk ExitCode = iota
	// ExitCodeInterrupted means that the work cycle has been interrupted and can be restarted.
	ExitCodeInterrupted
	// ExitCodeFailed means that the worker fails.
	ExitCodeFailed
	// ExitNeedReInit means that the worker can't do job and requires reinitialization.
	ExitReinitReq
)

type Message

type Message struct {
	UID    int64
	Target WorkerName
	Sender WorkerName
	Data   interface{}
}

type WContext

type WContext interface {
	context.Context
	SendMessage(target WorkerName, data interface{}) error
	MessageBus() <-chan *Message
}

func NewContext

func NewContext(name WorkerName, ctx context.Context, in, out chan *Message) WContext

type Worker

type Worker interface {
	// Init initializes new instance of the `Worker` implementation,
	// this context should be used only as Key/Value transmitter,
	// DO NOT use it for `<- ctx.Done()`
	Init(ctx context.Context) Worker
	// RestartOnFail determines the need to restart the worker, if it stopped.
	RestartOnFail() bool
	// Run starts the `Worker` instance execution.
	Run(ctx WContext) ExitCode
}

Worker is an interface for async workers which launches and manages by the `Chief`.

type WorkerExistRule

type WorkerExistRule struct {
	AvailableWorkers map[WorkerName]struct{}
	// contains filtered or unexported fields
}

func (*WorkerExistRule) Error

func (r *WorkerExistRule) Error(message string) *WorkerExistRule

Error sets the error message for the rule.

func (*WorkerExistRule) Validate

func (r *WorkerExistRule) Validate(value interface{}) error

Validate checks that service exist on the system

type WorkerName

type WorkerName string

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is

func (*WorkerPool) DisableWorker

func (pool *WorkerPool) DisableWorker(name WorkerName) error

DisableWorker sets state `WorkerDisabled` for workers with the specified `name`.

func (*WorkerPool) EnableWorker

func (pool *WorkerPool) EnableWorker(name WorkerName) error

EnableWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) FailWorker

func (pool *WorkerPool) FailWorker(name WorkerName) error

FailWorker sets state `WorkerFailed` for workers with the specified `name`.

func (*WorkerPool) GetState

func (pool *WorkerPool) GetState(name WorkerName) sam.State

GetState returns current state for workers with the specified `name`.

func (*WorkerPool) GetWorkersStates

func (pool *WorkerPool) GetWorkersStates() map[WorkerName]sam.State

GetWorkersStates returns current state of all workers.

func (*WorkerPool) InitWorker

func (pool *WorkerPool) InitWorker(name WorkerName, ctx context.Context) error

InitWorker initializes all present workers.

func (*WorkerPool) IsDisabled

func (pool *WorkerPool) IsDisabled(name WorkerName) bool

IsEnabled checks is disabled worker with passed `name`.

func (*WorkerPool) IsEnabled

func (pool *WorkerPool) IsEnabled(name WorkerName) bool

IsEnabled checks is enabled worker with passed `name`.

func (*WorkerPool) IsRun

func (pool *WorkerPool) IsRun(name WorkerName) bool

IsRun checks is active worker with passed `name`.

func (*WorkerPool) ReplaceWorker

func (pool *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)

func (*WorkerPool) RunWorkerExec

func (pool *WorkerPool) RunWorkerExec(name WorkerName, ctx WContext) (err error)

RunWorkerExec adds worker into pool.

func (*WorkerPool) SetState

func (pool *WorkerPool) SetState(name WorkerName, state sam.State) error

SetState updates state of specified worker.

func (*WorkerPool) SetWorker

func (pool *WorkerPool) SetWorker(name WorkerName, worker Worker)

SetWorker adds worker into pool.

func (*WorkerPool) StartWorker

func (pool *WorkerPool) StartWorker(name WorkerName) error

StartWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) StopWorker

func (pool *WorkerPool) StopWorker(name WorkerName) error

StopWorker sets state `WorkerStopped` for workers with the specified `name`.

Directories

Path Synopsis
libs
clicheck module
cronjob module
logrus-hook module
logrushook module
zerolog-hook module
zerologhook module
presets
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL