Documentation
¶
Overview ¶
Package uwe (Ubiquitous Workers Engine) is a common toolset for building and organizing Go application with actor-like workers.
`Chief` is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, `Chief` can be used as a child supervisor inside the` Worker`, which is launched by `Chief` at the top-level.
`Worker` is an interface for async workers which launches and manages by the **Chief**.
1. `Init()` - method used to initialize some state of the worker that required interaction with outer context, for example, initialize some connectors. In many cases this method is optional, so it can be implemented as empty:
`func (*W) Init() error { return nil }`.
2. `Run(ctx Context) error` - starts the `Worker` instance execution. The context will provide a signal when a worker must stop through the `ctx.Done()`.
Workers lifecycle:
```text (*) -> [New] -> [Initialized] -> [Run] -> [Stopped]
| | | | | ↓ |-------------|------> [Failed]
```
Example ¶
package main
import (
"log"
"time"
)
func main() {
// initialize new instance of Chief
chief := NewChief()
// will add workers into the pool
chief.AddWorker("dummy", NewDummy())
// pass handler for internal events like errors, panics, warning, etc.
// you can log it with you favorite logger (ex Logrus, Zap, etc)
chief.SetEventHandler(STDLogEventHandler())
// init all registered workers and run it all
chief.Run()
}
type dummy struct{}
// NewDummy initialize new instance of dummy Worker.
func NewDummy() Worker {
// At this point in most cases there we are preparing some state of the worker,
// like a logger, configuration, variable, and fields.
return &dummy{}
}
// Init is an interface method used to initialize some state of the worker
// that required interaction with outer context, for example, initialize some connectors.
func (d *dummy) Init() error { return nil }
// Run starts event loop of worker.
func (d *dummy) Run(ctx Context) error {
// initialize all required stuffs for the execution flow
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
// define all the processing code here
// or move it to a method and make a call here
log.Println("do something")
case msg := <-ctx.Messages():
if msg == nil {
continue
}
log.Printf("Received new message: sender(%s) kind(%d) data(%v)\n",
msg.Sender, msg.Kind, msg.Data)
case <-ctx.Done():
// close all connections, channels and finalise state if needed
log.Println("good bye")
return nil
}
}
}
Index ¶
- Constants
- type AppInfo
- type Broker
- type Chief
- type Context
- type Event
- type EventHandler
- type EventLevel
- type IMQBroker
- type Locker
- type Mailbox
- type Message
- type MessageKind
- type NopBroker
- type NopMailbox
- type ReaderBus
- type Recover
- type RestartOption
- type SenderBus
- type Shutdown
- type StateInfo
- type Worker
- type WorkerExistRule
- type WorkerName
- type WorkerOpts
- type WorkerWithInit
Examples ¶
Constants ¶
const ( // StatusAction is a command useful for health-checks, because it returns status of all workers. StatusAction = "status" // PingAction is a simple command that returns the "pong" message. PingAction = "ping" )
const ( TargetBroadcast = "*" TargetSelfInit = "self-init" )
const ( WStateNotExists sam.State = "NotExists" WStateNew sam.State = "New" WStateInitialized sam.State = "Initialized" WStateRun sam.State = "Run" WStateStopped sam.State = "Stopped" WStateFailed sam.State = "Failed" )
const ( // Restart is a strategy to restart Worker // in case of panic or exit with error. Restart = RestartOnFail | RestartOnError // RestartAndReInit is a strategy to reinitialize and restart Worker // in case of panic or exit with error. RestartAndReInit = RestartOnFail | RestartOnError | RestartWithReInit )
const DefaultForceStopTimeout = 45 * time.Second
DefaultForceStopTimeout is a timeout for killing all workers.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AppInfo ¶
type AppInfo struct {
Name string `json:"name"`
Version string `json:"version"`
Build string `json:"build"`
Tag string `json:"tag"`
}
AppInfo is a details of the *Application* build.
func (AppInfo) SocketName ¶
SocketName returns name of *Chief Service Socket*.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func (*Broker) AddWorker ¶
func (hub *Broker) AddWorker(name WorkerName) Mailbox
func (*Broker) DefaultBus ¶
type Chief ¶
type Chief interface {
// AddWorker registers the worker in the pool.
AddWorker(WorkerName, Worker, ...WorkerOpts) Chief
AddWorkerAndLaunch(WorkerName, Worker, ...WorkerOpts) Chief
// GetWorkersStates returns the current state of all registered workers.
GetWorkersStates() map[WorkerName]sam.State
// EnableServiceSocket initializes `net.Socket` server for internal management purposes.
// By default, includes two actions:
// - "status" is a healthcheck-like, because it returns status of all workers;
// - "ping" is a simple command that returns the "pong" message.
// The user can provide his own list of actions with handler closures.
EnableServiceSocket(app AppInfo, actions ...socket.Action) Chief
// Event returns the channel with internal Events.
// > ATTENTION:
// `Event() <-chan Event` and `SetEventHandler(EventHandler)`
// are mutually exclusive, but one of them must be used!
Event() <-chan Event
// SetEventHandler adds a callback that processes the `Chief`
// internal events and can log them or do something else.
// > ATTENTION:
// `Event() <-chan Event` and `SetEventHandler(EventHandler)`
// are mutually exclusive, but one of them must be used!
SetEventHandler(EventHandler) Chief
// SetContext replaces the default context with the provided one.
// It can be used to deliver some values inside `(Worker) .Run (ctx Context)`.
SetContext(context.Context) Chief
// SetLocker sets a custom `Locker`, if it is not set,
// the default `Locker` will be used, which expects SIGTERM or SIGINT system signals.
SetLocker(Locker) Chief
// SetShutdown sets `Shutdown` callback.
SetShutdown(Shutdown) Chief
// SetForceStopTimeout replaces the `DefaultForceStopTimeout`.
// ForceStopTimeout is the duration before
// the worker will be killed if it wouldn't finish Run after the stop signal.
SetForceStopTimeout(time.Duration) Chief
// UseCustomIMQBroker sets non-standard implementation
// of the IMQBroker to replace default one.
UseCustomIMQBroker(IMQBroker) Chief
// UseNopIMQBroker replaces default IMQ Broker by empty stub.
// NOP stands for no-operations.
UseNopIMQBroker() Chief
// Run is the main entry point into the `Chief` run loop.
// This method initializes all added workers, the server `net.Socket`,
// if enabled, starts the workers in separate routines
// and waits for the end of lock produced by the locker function.
Run()
// Shutdown sends stop signal to all child goroutines
// by triggering of the `context.CancelFunc()` and
// executes `Shutdown` callback.
Shutdown()
}
Chief is a supervisor that can be placed at the top of the go app's execution stack, it is blocked until SIGTERM is intercepted, and then it shut down all workers gracefully. Also, `Chief` can be used as a child supervisor inside the `Worker`, which is launched by `Chief` at the top-level.
type Context ¶
Context is a wrapper over the standard `context.Context`. The main purpose of this is to extend in the future.
type Event ¶
type Event struct {
Level EventLevel
Worker WorkerName
Fields map[string]interface{}
Message string
}
Event is a message object that is used to signalize about Chief's internal events and processed by `EventHandlers`.
func ErrorEvent ¶
ErrorEvent returns new Event with `LvlError` and provided message.
func (Event) FormatFields ¶
FormatFields concatenates fields in string format: "k1=value k2=value "
func (Event) SetWorker ¶
func (e Event) SetWorker(name WorkerName) Event
SetWorker sets the provided `worker` as the event source.
type EventHandler ¶
type EventHandler func(Event)
EventHandler callback that processes the `Chief` internal events, can log them or do something else.
func STDLogEventHandler ¶
func STDLogEventHandler() EventHandler
STDLogEventHandler returns a callback that handles internal `Chief` events and logs its.
type EventLevel ¶
type EventLevel string
EventLevel ...
const ( LvlFatal EventLevel = "fatal" LvlError EventLevel = "error" LvlInfo EventLevel = "info" )
type Locker ¶
type Locker func()
Locker is a function whose completion of a call is a signal to stop `Chief` and all workers.
type Mailbox ¶
func NewBus ¶
func NewBus(name WorkerName, toWorker, fromWorker chan *Message) Mailbox
type Message ¶
type Message struct {
Target WorkerName
Sender WorkerName
Kind MessageKind
Data interface{}
}
type MessageKind ¶
type MessageKind int
type NopBroker ¶
type NopBroker struct{}
NopBroker is an empty IMQBroker
func (*NopBroker) AddWorker ¶
func (*NopBroker) AddWorker(name WorkerName) Mailbox
func (*NopBroker) DefaultBus ¶
type NopMailbox ¶
type NopMailbox struct{}
NopMailbox is an empty Mailbox
func (*NopMailbox) Messages ¶
func (*NopMailbox) Messages() <-chan *Message
func (*NopMailbox) SelfInit ¶
func (m *NopMailbox) SelfInit(WorkerName) Mailbox
func (*NopMailbox) Send ¶
func (*NopMailbox) Send(WorkerName, interface{})
func (*NopMailbox) SendToMany ¶
func (*NopMailbox) SendToMany(MessageKind, interface{}, ...WorkerName)
func (*NopMailbox) SendWithKind ¶
func (*NopMailbox) SendWithKind(WorkerName, MessageKind, interface{})
type ReaderBus ¶
type ReaderBus interface {
Messages() <-chan *Message
}
func NewReaderBus ¶
func NewReaderBus(name WorkerName, toWorker chan *Message) ReaderBus
type Recover ¶
type Recover func(name WorkerName)
Recover is a function that will be used as a `defer call` to handle each worker's panic.
type RestartOption ¶
type RestartOption int
RestartOption is a behavior strategy for workers in case of error exit or panic.
const ( // RestartOnFail strategy to restart worker ONLY if it panics. // Worker will be restarted by calling the Run() method again. RestartOnFail RestartOption = 1 << iota // RestartOnError strategy to restart worker // ONLY if the Run() method return error. // Worker will be restarted by calling the Run() method again. RestartOnError // RestartWithReInit strategy that adds reinitialization before restart. // RestartWithReInit works only with RestartOnFail and/or RestartOnError. // Worker will be reinitialized and restarted // by calling the Init() and Run() method again. RestartWithReInit // NoRestart is a default strategy. //Worker wouldn't be restarted. NoRestart RestartOption = -1 // StopAppOnFail strategy to whole stop app // in case if Worker panicked or exited with error. StopAppOnFail RestartOption = -2 )
func (RestartOption) Is ¶
func (opt RestartOption) Is(mode RestartOption) bool
type SenderBus ¶
type SenderBus interface {
Send(target WorkerName, data interface{})
SendWithKind(target WorkerName, kind MessageKind, data interface{})
SendToMany(kind MessageKind, data interface{}, targets ...WorkerName)
SelfInit(name WorkerName) Mailbox
}
func NewSenderBus ¶
type Shutdown ¶
type Shutdown func()
Shutdown is a callback function that will be executed after the Chief and workers are stopped. Its main purpose is to close, complete, or retain some global states or shared resources.
type StateInfo ¶
type StateInfo struct {
App AppInfo `json:"app"`
Workers map[WorkerName]sam.State `json:"workers"`
}
StateInfo is result the `StatusAction` command.
func ParseStateInfo ¶
func ParseStateInfo(data json.RawMessage) (*StateInfo, error)
ParseStateInfo decodes `StateInfo` from the JSON response for the `StatusAction` command.
type Worker ¶
type Worker interface {
// Run starts the `Worker` instance execution. The context will provide a signal
// when a worker must stop through the `ctx.Done()`.
Run(ctx Context) error
}
Worker is an interface for async workers which launches and manages by the `Chief`.
type WorkerExistRule ¶
type WorkerExistRule struct {
AvailableWorkers map[WorkerName]struct{}
// contains filtered or unexported fields
}
WorkerExistRule is a custom validation rule for the validation libs.
func (*WorkerExistRule) Error ¶
func (r *WorkerExistRule) Error(message string) *WorkerExistRule
Error sets the error message for the rule.
func (*WorkerExistRule) Validate ¶
func (r *WorkerExistRule) Validate(value interface{}) error
Validate checks that service exist on the system
type WorkerName ¶
type WorkerName string
type WorkerOpts ¶
type WorkerOpts interface {
// contains filtered or unexported methods
}
type WorkerWithInit ¶
type WorkerWithInit interface {
Worker
// Init initializes some state of the worker that required interaction with outer context,
// for example, initialize some connectors. In many cases this method is optional,
// so it can be implemented as empty: `func (*W) Init() error { return nil }`.
Init() error
}
