uwe

package module
v3.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2023 License: Apache-2.0 Imports: 14 Imported by: 7

README

GoDoc Go Report Card

uwe

UWE (Ubiquitous Workers Engine) is a common toolset for building and organizing your Go application, actor-like workers.

Table of Content

  1. Quick Start

  2. Documentation

    1. Chief
    2. Worker
    3. Presets

Quick Start

Get uwe using go get:

go get github.com/lancer-kit/uwe/v3

Here is an example HelloWorld service with HTTP API and background worker:

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/lancer-kit/uwe/v3"
	"github.com/lancer-kit/uwe/v3/presets/api"
)

func main() {
	// fill configurations for the predefined worker that start an HTTP server
	apiCfg := api.Config{
		Host:              "0.0.0.0",
		Port:              8080,
		EnableCORS:        false,
		ApiRequestTimeout: 0,
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	// will add workers into the pool
	chief.AddWorker("app-server", api.NewServer(apiCfg, getRouter()), uwe.Restart)
	chief.AddWorker("dummy", NewDummy(), uwe.Restart)
	
	// pass handler for internal events like errors, panics, warning, etc.
	// you can log it with you favorite logger (ex Logrus, Zap, etc)
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// init all registered workers and run it all
	chief.Run()
}

type dummy struct{}

// NewDummy initialize new instance of dummy Worker.
func NewDummy() uwe.Worker {
	// At this point in most cases there we are preparing some state of the worker,
	// like a logger, configuration, variable, and fields.
	return &dummy{}
}

// Init is an interface method used to initialize some state of the worker
// that required interaction with outer context, for example, initialize some connectors.
func (d *dummy) Init() error { return nil }

// Run starts event loop of worker.
func (d *dummy) Run(ctx uwe.Context) error {
	// initialize all required stuffs for the execution flow 
	ticker := time.NewTicker(time.Second)

	for {
		select {
		case <-ticker.C:
			// define all the processing code here 
			// or move it to a method and make a call here
			log.Println("do something")
		case <-ctx.Done():
			// close all connections, channels and finalise state if needed
			log.Println("good bye")
			return nil
		}
	}
}

// getRouter is used to declare an API scheme, 
func getRouter() http.Handler {
	// instead default can be used any another compatible router
	mux := http.NewServeMux()
	mux.HandleFunc("/hello/uwe", func(w http.ResponseWriter, r *http.Request) {
		_, _ = fmt.Fprintln(w, "hello world")
	})

	log.Println("REST API router initialized")
	return mux
}

Documentation

Chief

Chief is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, Chief can be used as a child supervisor inside the Worker, which is launched by Chief at the top-level.

Worker

Worker is an interface for async workers which launches and manages by the Chief.

  1. Init() - method used to initialize some state of the worker that required interaction with outer context, for example, initialize some connectors. In many cases this method is optional, so it can be implemented as empty: func (*W) Init() error { return nil }.
  2. Run(ctx Context) error - starts the Worker instance execution. The context will provide a signal when a worker must stop through the ctx.Done().

Workers lifecycle:

 (*) -> [New] -> [Initialized] -> [Run] -> [Stopped]
          |             |           |
          |             |           ↓
          |-------------|------> [Failed]
Presets

This library provides some working presets to simplify the use of Chief in projects and reduce duplicate code.

HTTP Server

api.Server is worker by default for starting a standard HTTP server. Server requires configuration and initialized http.Handler.

The HTTP server will work properly and will be correctly disconnected upon a signal from Supervisor (Chief).

Warning: this Server does not process SSL/TLS certificates on its own.To start an HTTPS server, look for a specific worker.

package main

import (
   "fmt"
   "net/http"
   "time"

   "github.com/lancer-kit/uwe/v3"
   "github.com/lancer-kit/uwe/v3/presets/api"
)

func main() {
   // fill configurations for the predefined worker that start an HTTP server
   apiCfg := api.Config{
      Host:              "0.0.0.0",
      Port:              8080,
      EnableCORS:        false,
      ApiRequestTimeout: 45 * time.Second,
      ReadHeaderTimeout: 45 * time.Second,
   }

   // instead default can be used any another compatible router
   mux := http.NewServeMux()
   mux.HandleFunc("/hello/uwe", func(w http.ResponseWriter, r *http.Request) {
      _, _ = fmt.Fprintln(w, "hello world")
   })


   // initialize new instance of Chief
   uwe.NewChief().
	   SetEventHandler(uwe.STDLogEventHandler()).
	   AddWorker("app-server", api.NewServer(apiCfg, mux)).
	   Run()
   
   // or 
   
   chief  := uwe.NewChief()
   chief.SetEventHandler(uwe.STDLogEventHandler())
   chief.AddWorker("app-server", api.NewServer(apiCfg, mux))
   chief.Run()
}
Job

presets.Job is a primitive worker who performs an action callback with a given period.

package main

import (
	"log"
	"time"

	"github.com/lancer-kit/uwe/v3"
	"github.com/lancer-kit/uwe/v3/presets"
)

func main() {
	var action = func() error {
		// define all the processing code here
		// or move it to a method and make a call here
		log.Println("do something")
		return nil
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// will add workers into the pool
	chief.AddWorker("simple-job", presets.NewJob(time.Second, action))

	chief.Run()
}
WorkerFunc

presets.WorkerFunc is a type of worker that consist from one function. Allow to use the function as worker.

package presets

import (
	"log"
	"time"

	"github.com/lancer-kit/uwe/v3"
	"github.com/lancer-kit/uwe/v3/presets"
)

func main() {
	var anonFuncWorker = func(ctx uwe.Context) error {
		// initialize all required stuffs for the execution flow
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				// define all the processing code here
				// or move it to a method and make a call here
				log.Println("do something")
			case <-ctx.Done():
				// close all connections, channels and finalise state if needed
				log.Println("good bye")
				return nil
			}
		}
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// will add workers into the pool
	chief.AddWorker("anon-func", presets.WorkerFunc(anonFuncWorker))

	chief.Run()
}

License

This library is distributed under the Apache 2.0 license.

Documentation

Overview

Package uwe (Ubiquitous Workers Engine) is a common toolset for building and organizing Go application with actor-like workers.

`Chief` is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, `Chief` can be used as a child supervisor inside the` Worker`, which is launched by `Chief` at the top-level.

`Worker` is an interface for async workers which launches and manages by the **Chief**.

1. `Init()` - method used to initialize some state of the worker that required interaction with outer context, for example, initialize some connectors. In many cases this method is optional, so it can be implemented as empty:

`func (*W) Init() error { return nil }`.

2. `Run(ctx Context) error` - starts the `Worker` instance execution. The context will provide a signal when a worker must stop through the `ctx.Done()`.

Workers lifecycle:

```text (*) -> [New] -> [Initialized] -> [Run] -> [Stopped]

|             |           |
|             |           ↓
|-------------|------> [Failed]

```

Example
package main

import (
	"log"
	"time"
)

func main() {
	// initialize new instance of Chief
	chief := NewChief()
	// will add workers into the pool
	chief.AddWorker("dummy", NewDummy())

	// pass handler for internal events like errors, panics, warning, etc.
	// you can log it with you favorite logger (ex Logrus, Zap, etc)
	chief.SetEventHandler(STDLogEventHandler())

	// init all registered workers and run it all
	chief.Run()
}

type dummy struct{}

// NewDummy initialize new instance of dummy Worker.
func NewDummy() Worker {
	// At this point in most cases there we are preparing some state of the worker,
	// like a logger, configuration, variable, and fields.
	return &dummy{}
}

// Init is an interface method used to initialize some state of the worker
// that required interaction with outer context, for example, initialize some connectors.
func (d *dummy) Init() error { return nil }

// Run starts event loop of worker.
func (d *dummy) Run(ctx Context) error {
	// initialize all required stuffs for the execution flow
	ticker := time.NewTicker(time.Second)

	for {
		select {
		case <-ticker.C:
			// define all the processing code here
			// or move it to a method and make a call here
			log.Println("do something")
		case msg := <-ctx.Messages():
			if msg == nil {
				continue
			}
			log.Printf("Received new message: sender(%s) kind(%d) data(%v)\n",
				msg.Sender, msg.Kind, msg.Data)
		case <-ctx.Done():
			// close all connections, channels and finalise state if needed
			log.Println("good bye")
			return nil
		}
	}
}

Index

Examples

Constants

View Source
const (
	// StatusAction is a command useful for health-checks, because it returns status of all workers.
	StatusAction = "status"
	// PingAction is a simple command that returns the "pong" message.
	PingAction = "ping"
)
View Source
const (
	TargetBroadcast = "*"
	TargetSelfInit  = "self-init"
)
View Source
const (
	WStateNotExists   sam.State = "NotExists"
	WStateNew         sam.State = "New"
	WStateInitialized sam.State = "Initialized"
	WStateRun         sam.State = "Run"
	WStateStopped     sam.State = "Stopped"
	WStateFailed      sam.State = "Failed"
)
View Source
const (
	// Restart is a strategy to restart Worker
	// in case of panic or exit with error.
	Restart = RestartOnFail | RestartOnError
	// RestartAndReInit is a strategy to reinitialize and restart Worker
	// in case of panic or exit with error.
	RestartAndReInit = RestartOnFail | RestartOnError | RestartWithReInit
)
View Source
const DefaultForceStopTimeout = 45 * time.Second

DefaultForceStopTimeout is a timeout for killing all workers.

Variables

This section is empty.

Functions

This section is empty.

Types

type AppInfo

type AppInfo struct {
	Name    string `json:"name"`
	Version string `json:"version"`
	Build   string `json:"build"`
	Tag     string `json:"tag"`
}

AppInfo is a details of the *Application* build.

func (AppInfo) SocketName

func (app AppInfo) SocketName() string

SocketName returns name of *Chief Service Socket*.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(defaultChanLen int) *Broker

func (*Broker) AddWorker

func (hub *Broker) AddWorker(name WorkerName) Mailbox

func (*Broker) DefaultBus

func (hub *Broker) DefaultBus() SenderBus

func (*Broker) Init

func (hub *Broker) Init() error

func (*Broker) Serve

func (hub *Broker) Serve(ctx context.Context)

type Chief

type Chief interface {
	// AddWorker registers the worker in the pool.
	AddWorker(WorkerName, Worker, ...WorkerOpts) Chief
	AddWorkerAndLaunch(WorkerName, Worker, ...WorkerOpts) Chief
	// GetWorkersStates returns the current state of all registered workers.
	GetWorkersStates() map[WorkerName]sam.State
	// EnableServiceSocket initializes `net.Socket` server for internal management purposes.
	// By default, includes two actions:
	// 	- "status" is a healthcheck-like, because it returns status of all workers;
	// 	- "ping" is a simple command that returns the "pong" message.
	// The user can provide his own list of actions with handler closures.
	EnableServiceSocket(app AppInfo, actions ...socket.Action) Chief
	// Event returns the channel with internal Events.
	// > ATTENTION:
	//   `Event() <-chan Event` and `SetEventHandler(EventHandler)`
	// are mutually exclusive, but one of them must be used!
	Event() <-chan Event
	// SetEventHandler adds a callback that processes the `Chief`
	// internal events and can log them or do something else.
	// > ATTENTION:
	//   `Event() <-chan Event` and `SetEventHandler(EventHandler)`
	// are mutually exclusive, but one of them must be used!
	SetEventHandler(EventHandler) Chief
	// SetContext replaces the default context with the provided one.
	// It can be used to deliver some values inside `(Worker) .Run (ctx Context)`.
	SetContext(context.Context) Chief
	// SetLocker sets a custom `Locker`, if it is not set,
	// the default `Locker` will be used, which expects SIGTERM or SIGINT system signals.
	SetLocker(Locker) Chief
	// SetShutdown sets `Shutdown` callback.
	SetShutdown(Shutdown) Chief
	// SetForceStopTimeout replaces the `DefaultForceStopTimeout`.
	// ForceStopTimeout is the duration before
	// the worker will be killed if it wouldn't finish Run after the stop signal.
	SetForceStopTimeout(time.Duration) Chief
	// UseCustomIMQBroker sets non-standard implementation
	// of the IMQBroker to replace default one.
	UseCustomIMQBroker(IMQBroker) Chief
	// UseNopIMQBroker replaces default IMQ Broker by empty stub.
	// NOP stands for no-operations.
	UseNopIMQBroker() Chief
	// Run is the main entry point into the `Chief` run loop.
	// This method initializes all added workers, the server `net.Socket`,
	// if enabled, starts the workers in separate routines
	// and waits for the end of lock produced by the locker function.
	Run()
	// Shutdown sends stop signal to all child goroutines
	// by triggering of the `context.CancelFunc()` and
	// executes `Shutdown` callback.
	Shutdown()
}

Chief is a supervisor that can be placed at the top of the go app's execution stack, it is blocked until SIGTERM is intercepted, and then it shut down all workers gracefully. Also, `Chief` can be used as a child supervisor inside the `Worker`, which is launched by `Chief` at the top-level.

func NewChief

func NewChief() Chief

NewChief returns new instance of standard `Chief` implementation.

type Context

type Context interface {
	context.Context
	Mailbox
}

Context is a wrapper over the standard `context.Context`. The main purpose of this is to extend in the future.

func NewContext

func NewContext(c context.Context, m Mailbox) Context

NewContext returns new context.

type Event

type Event struct {
	Level   EventLevel
	Worker  WorkerName
	Fields  map[string]interface{}
	Message string
}

Event is a message object that is used to signalize about Chief's internal events and processed by `EventHandlers`.

func ErrorEvent

func ErrorEvent(msg string) Event

ErrorEvent returns new Event with `LvlError` and provided message.

func (Event) FormatFields

func (e Event) FormatFields() string

FormatFields concatenates fields in string format: "k1=value k2=value "

func (Event) IsError

func (e Event) IsError() bool

IsError returns `true` if event level is `Error`

func (Event) IsFatal

func (e Event) IsFatal() bool

IsFatal returns `true` if event level is `Fatal`

func (Event) SetField

func (e Event) SetField(key string, value interface{}) Event

SetField add to event some Key/Value.

func (Event) SetWorker

func (e Event) SetWorker(name WorkerName) Event

SetWorker sets the provided `worker` as the event source.

func (Event) ToError

func (e Event) ToError() error

ToError validates event level and cast to builtin `error`.

type EventHandler

type EventHandler func(Event)

EventHandler callback that processes the `Chief` internal events, can log them or do something else.

func STDLogEventHandler

func STDLogEventHandler() EventHandler

STDLogEventHandler returns a callback that handles internal `Chief` events and logs its.

type EventLevel

type EventLevel string

EventLevel ...

const (
	LvlFatal EventLevel = "fatal"
	LvlError EventLevel = "error"
	LvlInfo  EventLevel = "info"
)

type IMQBroker

type IMQBroker interface {
	DefaultBus() SenderBus
	AddWorker(name WorkerName) Mailbox
	Init() error
	Serve(ctx context.Context)
}

type Locker

type Locker func()

Locker is a function whose completion of a call is a signal to stop `Chief` and all workers.

type Mailbox

type Mailbox interface {
	SenderBus
	ReaderBus
}

func NewBus

func NewBus(name WorkerName, toWorker, fromWorker chan *Message) Mailbox

type Message

type Message struct {
	Target WorkerName
	Sender WorkerName
	Kind   MessageKind
	Data   interface{}
}

type MessageKind

type MessageKind int

type NopBroker

type NopBroker struct{}

NopBroker is an empty IMQBroker

func (*NopBroker) AddWorker

func (*NopBroker) AddWorker(name WorkerName) Mailbox

func (*NopBroker) DefaultBus

func (*NopBroker) DefaultBus() SenderBus

func (*NopBroker) Init

func (*NopBroker) Init() error

func (*NopBroker) Serve

func (*NopBroker) Serve(ctx context.Context)

type NopMailbox

type NopMailbox struct{}

NopMailbox is an empty Mailbox

func (*NopMailbox) Messages

func (*NopMailbox) Messages() <-chan *Message

func (*NopMailbox) SelfInit

func (m *NopMailbox) SelfInit(WorkerName) Mailbox

func (*NopMailbox) Send

func (*NopMailbox) Send(WorkerName, interface{})

func (*NopMailbox) SendToMany

func (*NopMailbox) SendToMany(MessageKind, interface{}, ...WorkerName)

func (*NopMailbox) SendWithKind

func (*NopMailbox) SendWithKind(WorkerName, MessageKind, interface{})

type ReaderBus

type ReaderBus interface {
	Messages() <-chan *Message
}

func NewReaderBus

func NewReaderBus(name WorkerName, toWorker chan *Message) ReaderBus

type Recover

type Recover func(name WorkerName)

Recover is a function that will be used as a `defer call` to handle each worker's panic.

type RestartOption

type RestartOption int

RestartOption is a behavior strategy for workers in case of error exit or panic.

const (
	// RestartOnFail strategy to restart worker ONLY if it panics.
	// Worker will be restarted by calling the Run() method again.
	RestartOnFail RestartOption = 1 << iota
	// RestartOnError strategy to restart worker
	// ONLY if the Run() method return error.
	// Worker will be restarted by calling the Run() method again.
	RestartOnError
	// RestartWithReInit strategy that adds reinitialization before restart.
	// RestartWithReInit works only with RestartOnFail and/or RestartOnError.
	// Worker will be reinitialized and restarted
	// by calling the Init() and Run() method again.
	RestartWithReInit

	// NoRestart is a default strategy.
	//Worker wouldn't be restarted.
	NoRestart RestartOption = -1
	// StopAppOnFail strategy to whole stop app
	// in case if Worker panicked or exited with error.
	StopAppOnFail RestartOption = -2
)

func (RestartOption) Is

func (opt RestartOption) Is(mode RestartOption) bool

type SenderBus

type SenderBus interface {
	Send(target WorkerName, data interface{})
	SendWithKind(target WorkerName, kind MessageKind, data interface{})
	SendToMany(kind MessageKind, data interface{}, targets ...WorkerName)
	SelfInit(name WorkerName) Mailbox
}

func NewSenderBus

func NewSenderBus(fromWorker chan<- *Message) SenderBus

type Shutdown

type Shutdown func()

Shutdown is a callback function that will be executed after the Chief and workers are stopped. Its main purpose is to close, complete, or retain some global states or shared resources.

type StateInfo

type StateInfo struct {
	App     AppInfo                  `json:"app"`
	Workers map[WorkerName]sam.State `json:"workers"`
}

StateInfo is result the `StatusAction` command.

func ParseStateInfo

func ParseStateInfo(data json.RawMessage) (*StateInfo, error)

ParseStateInfo decodes `StateInfo` from the JSON response for the `StatusAction` command.

type Worker

type Worker interface {
	// Run  starts the `Worker` instance execution. The context will provide a signal
	// when a worker must stop through the `ctx.Done()`.
	Run(ctx Context) error
}

Worker is an interface for async workers which launches and manages by the `Chief`.

type WorkerExistRule

type WorkerExistRule struct {
	AvailableWorkers map[WorkerName]struct{}
	// contains filtered or unexported fields
}

WorkerExistRule is a custom validation rule for the validation libs.

func (*WorkerExistRule) Error

func (r *WorkerExistRule) Error(message string) *WorkerExistRule

Error sets the error message for the rule.

func (*WorkerExistRule) Validate

func (r *WorkerExistRule) Validate(value interface{}) error

Validate checks that service exist on the system

type WorkerName

type WorkerName string

type WorkerOpts

type WorkerOpts interface {
	// contains filtered or unexported methods
}

type WorkerWithInit

type WorkerWithInit interface {
	Worker
	// Init initializes some state of the worker that required interaction with outer context,
	// for example, initialize some connectors. In many cases this method is optional,
	// so it can be implemented as empty: `func (*W) Init() error { return nil }`.
	Init() error
}

Directories

Path Synopsis
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL