uwe

package module
v2.0.1-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2019 License: Apache-2.0 Imports: 11 Imported by: 10

README

uwe

United Workers Environment


todo: update readme

Worker

Worker is an interface for async workers which launches and manages by the Chief.

The main ideas of the Worker:

  • this is simple and lightweight worker;
  • it can communicate with surroundings through channels, message queues, etc;
  • worker must do one or few small jobs;
  • should be able to run the worker as one independent process;

For example microservice for the image storing can have three workers:

  1. Rest API - receives and gives out images from user;
  2. Image resizer - compresses and resize the image for the thumbnails;
  3. Uploader - uploads files to the S3 storage.

All this workers are part of one microservice, in one binary and able to run them as a one process or as a three different.

Method list:
  • Init(context.Context) Worker - initializes new instance of the Worker implementation.
  • Run() - starts the Worker instance execution. This should be a blocking call, which in normal mode will be executed in goroutine.

Chief

Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type Worker.

Method list:
  • AddWorker(name string, worker Worker - register a new Worker to the Chief worker pool.
  • EnableWorkers(names ...string) - enables all worker from the names list. By default, all added workers are enabled.
  • EnableWorker(name string) - enables the worker with the specified name. By default, all added workers are enabled.
  • IsEnabled(name string) bool - checks is enable worker with passed name.
  • InitWorkers(logger *logrus.Entry) - initializes all registered workers.
  • Start(parentCtx context.Context) - runs all registered workers, locks until the parentCtx closes, and then gracefully stops all workers.

In InitWorkers Chief insert in the context his logger (*logrus.Entry), so at the worker you can took by key routines.CtxKeyLog and use it.

// .....
func (w *MyWorker) Init(parentCtx context.Context) routines.Worker {
    logger, ok := parentCtx.Value(routines.CtxKeyLog).(*logrus.Entry)
    if !ok {
        // process error
    }
    // add field with worker name.
    w.Logger = logger.WithField("worker", "my-cool-worker")
    
    // ... do other stuff ... //
    
    return w
}
// .....

Usage

Just define the routines.Chief variable, register your worker using the AddWorker method. Before starting, you must initialize registered workers using the InitWorkers(*logrus.Entry) method.

A very simple example:

package main

import (
    "github.com/lancer-kit/armory/routines"
    "context"
)

var WorkersChief routines.Chief

func init()  {
    WorkersChief = routines.Chief{}
    WorkersChief.AddWorker("my-awesome-worker", &MyWorker{})
    // `MyWorker` is a type which implement `Worker` interface.
}

func main () {
    WorkersChief.InitWorkers(nil)
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        WorkersChief.Start(ctx)
    }()
    
    defer func() {
        cancel()
    }()
}

Documentation

Index

Constants

View Source
const (
	WStateNotExists   sam.State = "NotExists"
	WStateNew         sam.State = "New"
	WStateInitialized sam.State = "Initialized"
	WStateRun         sam.State = "Run"
	WStateStopped     sam.State = "Stopped"
	WStateFailed      sam.State = "Failed"
)
View Source
const DefaultForceStopTimeout = 45 * time.Second

DefaultForceStopTimeout is a timeout for killing all workers.

Variables

View Source
var (
	ErrWorkerNotExist = func(name WorkerName) error {
		return fmt.Errorf("%s: not exist", name)
	}
)

Functions

This section is empty.

Types

type Chief

type Chief interface {
	Run()
	Shutdown()

	AddWorker(WorkerName, Worker)
	AddWorkers(map[WorkerName]Worker)

	SetEventHandler(EventHandler)
	SetContext(context.Context)
	SetLocker(Locker)
	SetRecover(Recover)
	UseDefaultRecover()
	SetShutdown(Shutdown)

	Event() <-chan Event
}

func NewChief

func NewChief() Chief

type Context

type Context interface {
	context.Context
}

func NewContext

func NewContext() Context

type Event

type Event struct {
	Level   EventLevel
	Worker  WorkerName
	Fields  map[string]interface{}
	Message string
}

func ErrorEvent

func ErrorEvent(msg string) Event

func (Event) IsError

func (e Event) IsError() bool

func (Event) IsFatal

func (e Event) IsFatal() bool

func (Event) SetField

func (e Event) SetField(key string, value interface{}) Event

func (Event) SetWorker

func (e Event) SetWorker(name WorkerName) Event

func (Event) ToError

func (e Event) ToError() error

type EventHandler

type EventHandler func(Event)

type EventLevel

type EventLevel string
const (
	LvlFatal EventLevel = "fatal"
	LvlError EventLevel = "error"
	LvlInfo  EventLevel = "info"
)

type Locker

type Locker func()

type Recover

type Recover func()

type Shutdown

type Shutdown func()

type Worker

type Worker interface {
	// Init initializes new instance of the `Worker` implementation
	Init() error
	// Run starts the `Worker` instance execution.
	// Context should be used for listening to ctx.Done()
	Run(Context) error
}

Worker is an interface for async workers which launches and manages by the `Chief`.

type WorkerExistRule

type WorkerExistRule struct {
	AvailableWorkers map[WorkerName]struct{}
	// contains filtered or unexported fields
}

func (*WorkerExistRule) Error

func (r *WorkerExistRule) Error(message string) *WorkerExistRule

Error sets the error message for the rule.

func (*WorkerExistRule) Validate

func (r *WorkerExistRule) Validate(value interface{}) error

Validate checks that service exist on the system

type WorkerName

type WorkerName string

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is

func (*WorkerPool) FailWorker

func (p *WorkerPool) FailWorker(name WorkerName) error

FailWorker sets state `WorkerFailed` for workers with the specified `name`.

func (*WorkerPool) GetState

func (p *WorkerPool) GetState(name WorkerName) sam.State

GetState returns current state for workers with the specified `name`.

func (*WorkerPool) GetWorkersStates

func (p *WorkerPool) GetWorkersStates() map[WorkerName]sam.State

GetWorkersStates returns current state of all workers.

func (*WorkerPool) InitWorker

func (p *WorkerPool) InitWorker(name WorkerName) error

InitWorker initializes all present workers.

func (*WorkerPool) IsRun

func (p *WorkerPool) IsRun(name WorkerName) bool

IsRun checks is active worker with passed `name`.

func (*WorkerPool) ReplaceWorker

func (p *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)

func (*WorkerPool) RunWorkerExec

func (p *WorkerPool) RunWorkerExec(ctx Context, name WorkerName) (err error)

RunWorkerExec adds worker into pool.

func (*WorkerPool) SetState

func (p *WorkerPool) SetState(name WorkerName, state sam.State) error

SetState updates state of specified worker.

func (*WorkerPool) SetWorker

func (p *WorkerPool) SetWorker(name WorkerName, worker Worker) error

SetWorker adds worker into pool.

func (*WorkerPool) StartWorker

func (p *WorkerPool) StartWorker(name WorkerName) error

StartWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) StopWorker

func (p *WorkerPool) StopWorker(name WorkerName) error

StopWorker sets state `WorkerStopped` for workers with the specified `name`.

Directories

Path Synopsis
examples
recover command
simpleapi command
simplecron command
presets
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL