Documentation
¶
Overview ¶
Package node provides distributed worker pools with supervisors.
Index ¶
- Constants
- Variables
- func AddErrPool(mach *am.Machine, err error, args am.A)
- func AddErrPoolStr(mach *am.Machine, msg string, args am.A)
- func AddErrRpc(mach *am.Machine, err error, args am.A)
- func AddErrWorker(mach *am.Machine, err error, args am.A)
- func AddErrWorkerStr(mach *am.Machine, msg string, args am.A)
- func GetClientId(name string) string
- func GetRpcClientId(name string) string
- func LogArgs(args am.A) map[string]string
- func Pass(args *A) am.A
- func PassRpc(args *A) am.A
- type A
- type ARpc
- type Client
- func (c *Client) Dispose(ctx context.Context)
- func (c *Client) ReqWorker(ctx context.Context) error
- func (c *Client) Start(nodesList []string)
- func (c *Client) StartEnd(e *am.Event)
- func (c *Client) StartEnter(e *am.Event) bool
- func (c *Client) StartState(e *am.Event)
- func (c *Client) Stop(ctx context.Context)
- func (c *Client) WorkerPayloadEnter(e *am.Event) bool
- func (c *Client) WorkerPayloadState(e *am.Event)
- func (c *Client) WorkerRequestedEnter(e *am.Event) bool
- func (c *Client) WorkerRequestedState(e *am.Event)
- type ClientOpts
- type ClientStateDeps
- type Supervisor
- func (s *Supervisor) AllWorkers() []*workerInfo
- func (s *Supervisor) AwaitingWorkerEnter(e *am.Event) bool
- func (s *Supervisor) AwaitingWorkerState(e *am.Event)
- func (s *Supervisor) BusyWorkers() []*workerInfo
- func (s *Supervisor) CheckPool() bool
- func (s *Supervisor) ClientConnectedState(e *am.Event)
- func (s *Supervisor) ClientDisconnectedEnter(e *am.Event) bool
- func (s *Supervisor) ClientDisconnectedState(e *am.Event)
- func (s *Supervisor) Dispose()
- func (s *Supervisor) ErrWorkerState(e *am.Event)
- func (s *Supervisor) ForkWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ForkWorkerState(e *am.Event)
- func (s *Supervisor) ForkingWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ForkingWorkerState(e *am.Event)
- func (s *Supervisor) HeartbeatState(e *am.Event)
- func (s *Supervisor) IdleWorkers() []*workerInfo
- func (s *Supervisor) InitingWorkers() []*workerInfo
- func (s *Supervisor) KillingWorkerEnter(e *am.Event) bool
- func (s *Supervisor) KillingWorkerState(e *am.Event)
- func (s *Supervisor) NormalizingPoolState(e *am.Event)
- func (s *Supervisor) PoolReadyEnter(e *am.Event) bool
- func (s *Supervisor) PoolReadyExit(e *am.Event) bool
- func (s *Supervisor) ProvideWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ProvideWorkerState(e *am.Event)
- func (s *Supervisor) ReadyWorkers() []*workerInfo
- func (s *Supervisor) RpcWorkers() []*workerInfo
- func (s *Supervisor) SetPool(min, max, warm, maxPerClient int)
- func (s *Supervisor) Start(publicAddr string)
- func (s *Supervisor) StartEnd(e *am.Event)
- func (s *Supervisor) StartEnter(e *am.Event) bool
- func (s *Supervisor) StartState(e *am.Event)
- func (s *Supervisor) Stop()
- func (s *Supervisor) WorkerForkedEnter(e *am.Event) bool
- func (s *Supervisor) WorkerForkedState(e *am.Event)
- func (s *Supervisor) WorkerKilledEnter(e *am.Event) bool
- func (s *Supervisor) WorkerKilledState(e *am.Event)
- type SupervisorOpts
- type Worker
- func (w *Worker) ErrNetworkState(e *am.Event)
- func (w *Worker) HealthcheckState(e *am.Event)
- func (w *Worker) LocalRpcReadyState(e *am.Event)
- func (w *Worker) PublicRpcReadyState(e *am.Event)
- func (w *Worker) RpcReadyState(e *am.Event)
- func (w *Worker) SendPayloadEnter(e *am.Event) bool
- func (w *Worker) ServeClientEnter(e *am.Event) bool
- func (w *Worker) ServeClientState(e *am.Event)
- func (w *Worker) Start(localAddr string) am.Result
- func (w *Worker) StartEnd(e *am.Event)
- func (w *Worker) StartEnter(e *am.Event) bool
- func (w *Worker) StartState(e *am.Event)
- func (w *Worker) Stop(dispose bool)
- type WorkerOpts
Constants ¶
const ( // EnvAmNodeLogSupervisor enables machine logging for node supervisor. EnvAmNodeLogSupervisor = "AM_NODE_LOG_SUPERVISOR" // EnvAmNodeLogClient enables machine logging for node client. EnvAmNodeLogClient = "AM_NODE_LOG_CLIENT" )
Variables ¶
var ( ErrWorker = errors.New("worker error") ErrWorkerMissing = errors.New("worker missing") ErrWorkerHealth = errors.New("worker failed healthcheck") ErrWorkerConn = errors.New("error starting connection") ErrWorkerKill = errors.New("error killing worker") ErrPool = errors.New("pool error") ErrRpc = errors.New("rpc error") )
Functions ¶
func AddErrPool ¶
AddErrPool wraps an error in the ErrPool sentinel and adds to a machine.
func AddErrPoolStr ¶
AddErrPoolStr wraps a msg in the ErrPool sentinel and adds to a machine.
func AddErrWorker ¶
AddErrWorker wraps an error in the ErrWorker sentinel and adds to a machine.
func AddErrWorkerStr ¶
AddErrWorkerStr wraps a msg in the ErrWorker sentinel and adds to a machine.
func GetClientId ¶
GetClientId returns a machine ID from a name.
func GetRpcClientId ¶
GetRpcClientId returns a machine ID from a name.
Types ¶
type A ¶
type A struct {
// Id is a machine ID.
Id string `log:"id"`
// PublicAddr is the public address of a Supervisor or Worker.
PublicAddr string `log:"public_addr"`
// LocalAddr is the public address of a Supervisor or Worker.
LocalAddr string `log:"local_addr"`
// NodesList is a list of available nodes (supervisors' public RPC addresses).
NodesList []string
ClientAddr string
// Worker is the RPC client connected to a Worker.
Worker *rpc.Client
// Bootstrap is the RPC machine used to connect Worker to the Supervisor.
Bootstrap *bootstrap
// Dispose the worker
Dispose bool
}
A is a struct for node arguments. It's a typesafe alternative to am.A.
type ARpc ¶
type ARpc struct {
// Id is a machine ID.
Id string `log:"id"`
// PublicAddr is the public address of a Supervisor or Worker.
PublicAddr string `log:"public_addr"`
// LocalAddr is the public address of a Supervisor, Worker, or [bootstrap].
LocalAddr string `log:"local_addr"`
// NodesList is a list of available nodes (supervisors' public RPC addresses).
NodesList []string
ClientAddr string
}
ARpc is a subset of A, that can be passed over RPC.
type Client ¶
type Client struct {
*am.ExceptionHandler
Mach *am.Machine
Name string
SuperAddr string
LogEnabled bool
// LeaveSuper is a flag to leave the supervisor after connecting to the
// worker. TODO
LeaveSuper bool
// ConnTimeout is the time to wait for an outbound connection to be
// established. Default is 5 seconds.
ConnTimeout time.Duration
SuperRpc *rpc.Client
WorkerRpc *rpc.Client
// contains filtered or unexported fields
}
Client is a node client, connecting to a supervisor and then a worker.
func NewClient ¶
func NewClient(ctx context.Context, id string, workerKind string, stateDeps *ClientStateDeps, opts *ClientOpts, ) (*Client, error)
NewClient creates a new Client instance with the provided context, id, workerKind, state dependencies, and options. Returns a pointer to the Client instance and an error if any validation or initialization fails.
func (*Client) ReqWorker ¶
ReqWorker sends a request to add a "WorkerRequested" state to the client's state machine and waits for "WorkerReady" state.
func (*Client) StartState ¶
func (*Client) Stop ¶
Stop halts the client's connection to both the supervisor and worker RPCs, and removes the client state from the state machine.
func (*Client) WorkerPayloadState ¶
WorkerPayloadState handles both Supervisor and Worker inbound payloads.
func (*Client) WorkerRequestedState ¶
type ClientOpts ¶
type ClientOpts struct {
// Parent is a parent state machine for a new client state machine. See
// [am.Opts].
Parent am.Api
}
ClientOpts provides configuration options for creating a new client state machine.
type ClientStateDeps ¶
type ClientStateDeps struct {
ClientSStruct am.Struct
ClientSNames am.S
WorkerSStruct am.Struct
WorkerSNames am.S
}
ClientStateDeps contains the state definitions and names of the client and worker machines, needed to create a new client.
type Supervisor ¶
type Supervisor struct {
*am.ExceptionHandler
Mach *am.Machine
// WorkerKind is the kind of worker this supervisor is managing.
WorkerKind string
// WorkerBin is the path and args to the worker binary.
WorkerBin []string
// Name is the name of the supervisor.
Name string
LogEnabled bool
// Max is the maximum number of workers. Default is 10.
Max int
// Min is the minimum number of workers. Default is 2.
Min int
// Warm is the number of warm (ready) workers. Default is 5.
Warm int
// MaxClientWorkers is the maximum number of workers per 1 client. Defaults to
// Max.
MaxClientWorkers int
// WorkerErrTtl is the time to keep worker errors in memory. Default is 10m.
WorkerErrTtl time.Duration
// WorkerErrRecent is the time to consider recent errors. Default is 1m.
WorkerErrRecent time.Duration
// WorkerErrKill is the number of errors to kill a worker. Default is 3.
WorkerErrKill int
// ConnTimeout is the time to wait for an outbound connection to be
// established. Default is 5s.
ConnTimeout time.Duration
DeliveryTimeout time.Duration
// PoolPause is the time to wait between normalizing the pool. Default is 5s.
PoolPause time.Duration
// HealthcheckPause is the time between trying to get a Healtcheck response
// from a worker.
HealthcheckPause time.Duration
Heartbeat time.Duration
// PublicAddr is the address for the public RPC server to listen on. The
// effective address is at [PublicMux.Addr].
PublicAddr string
// PublicMux is the public listener to create RPC servers for each client.
PublicMux *rpc.Mux
// PublicRpc are the public RPC servers of connected clients, indexed by
// remote addresses.
PublicRpcs map[string]*rpc.Server
// LocalAddr is the address for the local RPC server to listen on. The
// effective address is at [LocalRpc.Addr].
LocalAddr string
// LocalRpc is the local RPC server, used by other supervisors to connect.
// TODO rpc/mux
LocalRpc *rpc.Server
WorkerReadyState am.HandlerFinal
WorkerGoneState am.HandlerFinal
ClientSendPayloadState am.HandlerFinal
SuperSendPayloadState am.HandlerFinal
// contains filtered or unexported fields
}
func NewSupervisor ¶
func NewSupervisor( ctx context.Context, workerKind string, workerBin []string, workerStruct am.Struct, workerSNames am.S, opts *SupervisorOpts, ) (*Supervisor, error)
NewSupervisor initializes and returns a new Supervisor instance with specified context, worker attributes, and options.
func (*Supervisor) AllWorkers ¶
func (s *Supervisor) AllWorkers() []*workerInfo
AllWorkers returns workers (in any state).
func (*Supervisor) AwaitingWorkerEnter ¶
func (s *Supervisor) AwaitingWorkerEnter(e *am.Event) bool
func (*Supervisor) AwaitingWorkerState ¶
func (s *Supervisor) AwaitingWorkerState(e *am.Event)
func (*Supervisor) BusyWorkers ¶
func (s *Supervisor) BusyWorkers() []*workerInfo
BusyWorkers returns Busy workers.
func (*Supervisor) CheckPool ¶
func (s *Supervisor) CheckPool() bool
CheckPool tries to set pool as ready and normalizes it, if not.
func (*Supervisor) ClientConnectedState ¶
func (s *Supervisor) ClientConnectedState(e *am.Event)
func (*Supervisor) ClientDisconnectedEnter ¶
func (s *Supervisor) ClientDisconnectedEnter(e *am.Event) bool
func (*Supervisor) ClientDisconnectedState ¶
func (s *Supervisor) ClientDisconnectedState(e *am.Event)
func (*Supervisor) Dispose ¶
func (s *Supervisor) Dispose()
func (*Supervisor) ErrWorkerState ¶
func (s *Supervisor) ErrWorkerState(e *am.Event)
func (*Supervisor) ForkWorkerEnter ¶
func (s *Supervisor) ForkWorkerEnter(e *am.Event) bool
func (*Supervisor) ForkWorkerState ¶
func (s *Supervisor) ForkWorkerState(e *am.Event)
func (*Supervisor) ForkingWorkerEnter ¶
func (s *Supervisor) ForkingWorkerEnter(e *am.Event) bool
func (*Supervisor) ForkingWorkerState ¶
func (s *Supervisor) ForkingWorkerState(e *am.Event)
func (*Supervisor) HeartbeatState ¶
func (s *Supervisor) HeartbeatState(e *am.Event)
func (*Supervisor) IdleWorkers ¶
func (s *Supervisor) IdleWorkers() []*workerInfo
IdleWorkers returns Idle workers.
func (*Supervisor) InitingWorkers ¶
func (s *Supervisor) InitingWorkers() []*workerInfo
InitingWorkers returns workers being currently initialized.
func (*Supervisor) KillingWorkerEnter ¶
func (s *Supervisor) KillingWorkerEnter(e *am.Event) bool
func (*Supervisor) KillingWorkerState ¶
func (s *Supervisor) KillingWorkerState(e *am.Event)
func (*Supervisor) NormalizingPoolState ¶
func (s *Supervisor) NormalizingPoolState(e *am.Event)
func (*Supervisor) PoolReadyEnter ¶
func (s *Supervisor) PoolReadyEnter(e *am.Event) bool
func (*Supervisor) PoolReadyExit ¶
func (s *Supervisor) PoolReadyExit(e *am.Event) bool
func (*Supervisor) ProvideWorkerEnter ¶
func (s *Supervisor) ProvideWorkerEnter(e *am.Event) bool
func (*Supervisor) ProvideWorkerState ¶
func (s *Supervisor) ProvideWorkerState(e *am.Event)
func (*Supervisor) ReadyWorkers ¶
func (s *Supervisor) ReadyWorkers() []*workerInfo
ReadyWorkers returns Ready workers.
func (*Supervisor) RpcWorkers ¶
func (s *Supervisor) RpcWorkers() []*workerInfo
RpcWorkers returns workers with an RPC connection.
func (*Supervisor) SetPool ¶
func (s *Supervisor) SetPool(min, max, warm, maxPerClient int)
SetPool sets the pool parameters with defaults.
func (*Supervisor) Start ¶
func (s *Supervisor) Start(publicAddr string)
func (*Supervisor) StartEnd ¶
func (s *Supervisor) StartEnd(e *am.Event)
func (*Supervisor) StartEnter ¶
func (s *Supervisor) StartEnter(e *am.Event) bool
func (*Supervisor) StartState ¶
func (s *Supervisor) StartState(e *am.Event)
func (*Supervisor) Stop ¶
func (s *Supervisor) Stop()
func (*Supervisor) WorkerForkedEnter ¶
func (s *Supervisor) WorkerForkedEnter(e *am.Event) bool
func (*Supervisor) WorkerForkedState ¶
func (s *Supervisor) WorkerForkedState(e *am.Event)
func (*Supervisor) WorkerKilledEnter ¶
func (s *Supervisor) WorkerKilledEnter(e *am.Event) bool
func (*Supervisor) WorkerKilledState ¶
func (s *Supervisor) WorkerKilledState(e *am.Event)
type SupervisorOpts ¶
type Worker ¶
type Worker struct {
*am.ExceptionHandler
Mach *am.Machine
Name string
Kind string
// AcceptClient is the ID of a client, passed by the supervisor. Worker should
// only accept connections from this client.
AcceptClient string
// ConnTimeout is the time to wait for an outbound connection to be
// established.
ConnTimeout time.Duration
DeliveryTimeout time.Duration
// BootAddr is the address of the bootstrap machine.
BootAddr string
// BootRpc is the RPC client connection to bootstrap machine, which passes
// connection info to the Supervisor.
BootRpc *rpc.Client
// LocalAddr is the address of the local RPC server.
LocalAddr string
// LocalRpc is the local RPC server, used by the Supervisor to connect.
LocalRpc *rpc.Server
// PublicAddr is the address of the public RPC server.
PublicAddr string
// PublicRpc is the public RPC server, used by the Client to connect.
PublicRpc *rpc.Server
}
func NewWorker ¶
func NewWorker(ctx context.Context, kind string, workerStruct am.Struct, stateNames am.S, opts *WorkerOpts, ) (*Worker, error)
NewWorker initializes a new Worker instance and returns it, or an error if validation fails.