Documentation
¶
Overview ¶
Package node provides distributed worker pools with supervisors.
Index ¶
- Constants
- Variables
- func AddErrPool(mach *am.Machine, err error, args am.A) error
- func AddErrPoolStr(mach *am.Machine, msg string, args am.A) error
- func AddErrRpc(mach *am.Machine, err error, args am.A) error
- func AddErrWorker(event *am.Event, mach *am.Machine, err error, args am.A) error
- func AddErrWorkerStr(mach *am.Machine, msg string, args am.A) error
- func GetSuperClientId(name string) string
- func GetWorkerClientId(name string) string
- func LogArgs(args am.A) map[string]string
- func Pass(args *A) am.A
- func PassRpc(args *A) am.A
- type A
- type ARpc
- type Client
- func (c *Client) Dispose(ctx context.Context)
- func (c *Client) ReqWorker(ctx context.Context) error
- func (c *Client) ServerPayloadEnter(e *am.Event) bool
- func (c *Client) ServerPayloadState(e *am.Event)
- func (c *Client) Start(nodesList []string)
- func (c *Client) StartEnd(e *am.Event)
- func (c *Client) StartEnter(e *am.Event) bool
- func (c *Client) StartState(e *am.Event)
- func (c *Client) Stop(ctx context.Context)
- func (c *Client) WorkerRequestedEnter(e *am.Event) bool
- func (c *Client) WorkerRequestedState(e *am.Event)
- type ClientOpts
- type Supervisor
- func (s *Supervisor) CheckPool() bool
- func (s *Supervisor) ClientConnectedState(e *am.Event)
- func (s *Supervisor) ClientDisconnectedEnter(e *am.Event) bool
- func (s *Supervisor) ClientDisconnectedState(e *am.Event)
- func (s *Supervisor) Dispose()
- func (s *Supervisor) ErrWorkerState(e *am.Event)
- func (s *Supervisor) ForkWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ForkWorkerState(e *am.Event)
- func (s *Supervisor) ForkingWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ForkingWorkerState(e *am.Event)
- func (s *Supervisor) HeartbeatState(e *am.Event)
- func (s *Supervisor) KillingWorkerEnter(e *am.Event) bool
- func (s *Supervisor) KillingWorkerState(e *am.Event)
- func (s *Supervisor) ListWorkersEnter(e *am.Event) bool
- func (s *Supervisor) ListWorkersState(e *am.Event)
- func (s *Supervisor) NormalizingPoolState(e *am.Event)
- func (s *Supervisor) PoolReadyEnter(e *am.Event) bool
- func (s *Supervisor) PoolReadyExit(e *am.Event) bool
- func (s *Supervisor) ProvideWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ProvideWorkerState(e *am.Event)
- func (s *Supervisor) SetPool(min, max, warm, maxPerClient int)
- func (s *Supervisor) SetWorkerEnter(e *am.Event) bool
- func (s *Supervisor) SetWorkerState(e *am.Event)
- func (s *Supervisor) Start(publicAddr string)
- func (s *Supervisor) StartEnd(e *am.Event)
- func (s *Supervisor) StartEnter(e *am.Event) bool
- func (s *Supervisor) StartState(e *am.Event)
- func (s *Supervisor) Stop()
- func (s *Supervisor) WorkerConnectedEnter(e *am.Event) bool
- func (s *Supervisor) WorkerConnectedState(e *am.Event)
- func (s *Supervisor) WorkerForkedEnter(e *am.Event) bool
- func (s *Supervisor) WorkerForkedState(e *am.Event)
- func (s *Supervisor) WorkerKilledEnter(e *am.Event) bool
- func (s *Supervisor) WorkerKilledState(e *am.Event)
- func (s *Supervisor) Workers(ctx context.Context, state WorkerState) ([]*workerInfo, error)
- type SupervisorOpts
- type Worker
- func (w *Worker) ErrNetworkState(e *am.Event)
- func (w *Worker) HealthcheckState(e *am.Event)
- func (w *Worker) LocalRpcReadyState(e *am.Event)
- func (w *Worker) PublicRpcReadyState(e *am.Event)
- func (w *Worker) RpcReadyState(e *am.Event)
- func (w *Worker) SendPayloadEnter(e *am.Event) bool
- func (w *Worker) ServeClientEnter(e *am.Event) bool
- func (w *Worker) ServeClientState(e *am.Event)
- func (w *Worker) Start(bootAddr string) am.Result
- func (w *Worker) StartEnd(e *am.Event)
- func (w *Worker) StartEnter(e *am.Event) bool
- func (w *Worker) StartState(e *am.Event)
- func (w *Worker) Stop(dispose bool)
- type WorkerOpts
- type WorkerState
Constants ¶
const ( // EnvAmNodeLogSupervisor enables extra logging for node supervisor. EnvAmNodeLogSupervisor = "AM_NODE_LOG_SUPERVISOR" // EnvAmNodeLogClient enables extra logging for node client. EnvAmNodeLogClient = "AM_NODE_LOG_CLIENT" )
const APrefix = "am_node"
Variables ¶
var ( ErrWorker = errors.New("worker error") ErrWorkerMissing = errors.New("worker missing") ErrWorkerHealth = errors.New("worker failed healthcheck") ErrWorkerConn = errors.New("error starting connection") ErrWorkerKill = errors.New("error killing worker") ErrPool = errors.New("pool error") ErrHeartbeat = errors.New("heartbeat failed") ErrRpc = errors.New("rpc error") )
Functions ¶
func AddErrPool ¶
AddErrPool wraps an error in the ErrPool sentinel and adds to a machine. TODO add event param
func AddErrPoolStr ¶
AddErrPoolStr wraps a msg in the ErrPool sentinel and adds to a machine. TODO add event param
func AddErrRpc ¶
AddErrRpc wraps an error in the ErrRpc sentinel and adds to a machine. TODO add event param
func AddErrWorker ¶
AddErrWorker wraps an error in the ErrWorker sentinel and adds to a machine.
func AddErrWorkerStr ¶
AddErrWorkerStr wraps a msg in the ErrWorker sentinel and adds to a machine. TODO add event param
func GetSuperClientId ¶ added in v0.9.0
GetSuperClientId returns a Node Supervisor machine ID from a name.
func GetWorkerClientId ¶ added in v0.9.0
GetWorkerClientId returns a Node Client machine ID from a name.
Types ¶
type A ¶
type A struct {
// Id is a machine ID.
Id string `log:"id"`
// PublicAddr is the public address of a Supervisor or WorkerRpc.
PublicAddr string `log:"public_addr"`
// LocalAddr is the public address of a Supervisor or WorkerRpc.
LocalAddr string `log:"local_addr"`
// BootAddr is the local address of the Bootstrap machine.
BootAddr string `log:"boot_addr"`
// NodesList is a list of available nodes (supervisors' public RPC addresses).
NodesList []string
// WorkerRpcId is a machine ID of the worker RPC client.
WorkerRpcId string `log:"id"`
// SuperRpcId is a machine ID of the super RPC client.
SuperRpcId string `log:"id"`
// WorkerRpc is the RPC client connected to a WorkerRpc.
WorkerRpc *rpc.Client
// Bootstrap is the RPC machine used to connect WorkerRpc to the Supervisor.
Bootstrap *bootstrap
// Dispose the worker.
Dispose bool
// WorkerAddr is an index for WorkerInfo.
WorkerAddr string
// WorkerInfo describes a worker.
WorkerInfo *workerInfo
// WorkersCh returns a list of workers. This channel has to be buffered.
WorkersCh chan<- []*workerInfo
// WorkerState is a requested state of workers, eg for listings.
WorkerState WorkerState
}
A is a struct for node arguments. It's a typesafe alternative to am.A.
type ARpc ¶
type ARpc struct {
// Id is a machine ID.
Id string `log:"id"`
// PublicAddr is the public address of a Supervisor or Worker.
PublicAddr string `log:"public_addr"`
// LocalAddr is the public address of a Supervisor, Worker, or [bootstrap].
LocalAddr string `log:"local_addr"`
// BootAddr is the local address of the Bootstrap machine.
BootAddr string `log:"boot_addr"`
// NodesList is a list of available nodes (supervisors' public RPC addresses).
NodesList []string
// WorkerRpcId is a machine ID of the worker RPC client.
WorkerRpcId string `log:"worker_rpc_id"`
// SuperRpcId is a machine ID of the super RPC client.
SuperRpcId string `log:"super_rpc_id"`
}
ARpc is a subset of A, that can be passed over RPC.
type Client ¶
type Client struct {
*am.ExceptionHandler
Mach *am.Machine
Name string
SuperAddr string
LogEnabled bool
// LeaveSuper is a flag to leave the supervisor after connecting to the
// worker. TODO
LeaveSuper bool
// ConnTimeout is the time to wait for an outbound connection to be
// established. Default is 5 seconds.
ConnTimeout time.Duration
RpcSuper *rpc.Client
RpcWorker *rpc.Client
// contains filtered or unexported fields
}
Client is a node client, connecting to a supervisor and then a worker.
func NewClient ¶
func NewClient(ctx context.Context, clientId string, workerKind string, workerSchema am.Schema, opts *ClientOpts, ) (*Client, error)
NewClient creates a new Client instance with the provided context, id, workerKind, state dependencies, and options. Returns a pointer to the Client instance and an error if any validation or initialization fails.
workerKind: any string used to build IDs and address workers
func (*Client) ReqWorker ¶
ReqWorker sends a request to add a "WorkerRequested" state to the client's state machine and waits for "WorkerReady" state.
func (*Client) ServerPayloadEnter ¶ added in v0.18.0
func (*Client) ServerPayloadState ¶ added in v0.18.0
ServerPayloadState handles both Supervisor and Worker inbound payloads, but this shared code only deals with states.ClientStatesDef.WorkerRequested.
func (*Client) StartState ¶
func (*Client) Stop ¶
Stop halts the client's connection to both the supervisor and worker RPCs, and removes the client state from the state machine.
func (*Client) WorkerRequestedState ¶
type ClientOpts ¶
type ClientOpts struct {
// Parent is a parent state machine for a new client state machine. See
// [am.Opts].
Parent am.Api
Tags []string
// Optional schema for the client. Should extend [states.ClientStatesDef].
ClientSchema am.Schema
// Optional state names for ClientSchema.
ClientStates am.S
}
ClientOpts provides configuration options for creating a new client state machine.
type Supervisor ¶
type Supervisor struct {
*am.ExceptionHandler
Mach *am.Machine
// WorkerKind is the kind of worker this supervisor is managing.
WorkerKind string
// WorkerBin is the path and args to the worker binary.
WorkerBin []string
// Name is the name of the supervisor.
Name string
LogEnabled bool
// Max is the maximum number of workers. Default is 10.
Max int
// Min is the minimum number of workers. Default is 2.
Min int
// Warm is the number of warm (ready) workers. Default is 5.
Warm int
// MaxClientWorkers is the maximum number of workers per 1 client. Defaults to
// Max.
MaxClientWorkers int
// WorkerErrTtl is the time to keep worker errors in memory. Default is 10m.
WorkerErrTtl time.Duration
// WorkerErrRecent is the time to consider recent errors. Default is 1m.
WorkerErrRecent time.Duration
// WorkerErrKill is the number of errors to kill a worker. Default is 3.
WorkerErrKill int
// ConnTimeout is the time to wait for an outbound connection to be
// established. Default is 5s.
ConnTimeout time.Duration
// DeliveryTimeout is a timeout for RPC delivery.
DeliveryTimeout time.Duration
// OpTimeout is the default timeout for operations (eg getters).
OpTimeout time.Duration
// PoolPause is the time to wait between normalizing the pool. Default is 5s.
PoolPause time.Duration
// HealthcheckPause is the time between trying to get a Healtcheck response
// from a worker.
HealthcheckPause time.Duration
// Heartbeat is the frequency of the Heartbeat state, which normalized the
// pool and checks workers. Default 1m.
Heartbeat time.Duration
// WorkerCheckInterval defines how often to pull a worker's state. Default 1s.
WorkerCheckInterval time.Duration
// PublicAddr is the address for the public RPC server to listen on. The
// effective address is at [PublicMux.Addr].
PublicAddr string
// PublicMux is the public listener to create RPC servers for each client.
PublicMux *rpc.Mux
// PublicRpc are the public RPC servers of connected clients, indexed by
// remote addresses.
PublicRpcs map[string]*rpc.Server
// LocalAddr is the address for the local RPC server to listen on. The
// effective address is at [LocalRpc.Addr].
LocalAddr string
// LocalRpc is the local RPC server, used by other supervisors to connect.
// TODO rpc/mux
LocalRpc *rpc.Server
TestFork func(string) error
TestKill func(string) error
WorkerReadyState am.HandlerFinal
WorkerGoneState am.HandlerFinal
KillWorkerState am.HandlerFinal
ClientSendPayloadState am.HandlerFinal
SuperSendPayloadState am.HandlerFinal
HealthcheckState am.HandlerFinal
// contains filtered or unexported fields
}
func NewSupervisor ¶
func NewSupervisor( ctx context.Context, workerKind string, workerBin []string, workerSchema am.Schema, opts *SupervisorOpts, ) (*Supervisor, error)
NewSupervisor initializes and returns a new Supervisor instance with specified context, worker attributes, and options.
workerBin: path to run the worker binary file (for exec.CommandContext).
func (*Supervisor) CheckPool ¶
func (s *Supervisor) CheckPool() bool
CheckPool tries to set pool as ready and normalizes it, if not.
func (*Supervisor) ClientConnectedState ¶
func (s *Supervisor) ClientConnectedState(e *am.Event)
func (*Supervisor) ClientDisconnectedEnter ¶
func (s *Supervisor) ClientDisconnectedEnter(e *am.Event) bool
func (*Supervisor) ClientDisconnectedState ¶
func (s *Supervisor) ClientDisconnectedState(e *am.Event)
func (*Supervisor) Dispose ¶
func (s *Supervisor) Dispose()
func (*Supervisor) ErrWorkerState ¶
func (s *Supervisor) ErrWorkerState(e *am.Event)
func (*Supervisor) ForkWorkerEnter ¶
func (s *Supervisor) ForkWorkerEnter(e *am.Event) bool
func (*Supervisor) ForkWorkerState ¶
func (s *Supervisor) ForkWorkerState(e *am.Event)
func (*Supervisor) ForkingWorkerEnter ¶
func (s *Supervisor) ForkingWorkerEnter(e *am.Event) bool
func (*Supervisor) ForkingWorkerState ¶
func (s *Supervisor) ForkingWorkerState(e *am.Event)
func (*Supervisor) HeartbeatState ¶
func (s *Supervisor) HeartbeatState(e *am.Event)
func (*Supervisor) KillingWorkerEnter ¶
func (s *Supervisor) KillingWorkerEnter(e *am.Event) bool
func (*Supervisor) KillingWorkerState ¶
func (s *Supervisor) KillingWorkerState(e *am.Event)
func (*Supervisor) ListWorkersEnter ¶ added in v0.10.0
func (s *Supervisor) ListWorkersEnter(e *am.Event) bool
func (*Supervisor) ListWorkersState ¶ added in v0.10.0
func (s *Supervisor) ListWorkersState(e *am.Event)
func (*Supervisor) NormalizingPoolState ¶
func (s *Supervisor) NormalizingPoolState(e *am.Event)
func (*Supervisor) PoolReadyEnter ¶
func (s *Supervisor) PoolReadyEnter(e *am.Event) bool
func (*Supervisor) PoolReadyExit ¶
func (s *Supervisor) PoolReadyExit(e *am.Event) bool
func (*Supervisor) ProvideWorkerEnter ¶
func (s *Supervisor) ProvideWorkerEnter(e *am.Event) bool
func (*Supervisor) ProvideWorkerState ¶
func (s *Supervisor) ProvideWorkerState(e *am.Event)
func (*Supervisor) SetPool ¶
func (s *Supervisor) SetPool(min, max, warm, maxPerClient int)
SetPool sets the pool parameters with defaults.
func (*Supervisor) SetWorkerEnter ¶ added in v0.10.0
func (s *Supervisor) SetWorkerEnter(e *am.Event) bool
func (*Supervisor) SetWorkerState ¶ added in v0.10.0
func (s *Supervisor) SetWorkerState(e *am.Event)
func (*Supervisor) Start ¶
func (s *Supervisor) Start(publicAddr string)
func (*Supervisor) StartEnd ¶
func (s *Supervisor) StartEnd(e *am.Event)
func (*Supervisor) StartEnter ¶
func (s *Supervisor) StartEnter(e *am.Event) bool
func (*Supervisor) StartState ¶
func (s *Supervisor) StartState(e *am.Event)
func (*Supervisor) Stop ¶
func (s *Supervisor) Stop()
func (*Supervisor) WorkerConnectedEnter ¶ added in v0.9.0
func (s *Supervisor) WorkerConnectedEnter(e *am.Event) bool
func (*Supervisor) WorkerConnectedState ¶ added in v0.9.0
func (s *Supervisor) WorkerConnectedState(e *am.Event)
func (*Supervisor) WorkerForkedEnter ¶
func (s *Supervisor) WorkerForkedEnter(e *am.Event) bool
func (*Supervisor) WorkerForkedState ¶
func (s *Supervisor) WorkerForkedState(e *am.Event)
func (*Supervisor) WorkerKilledEnter ¶
func (s *Supervisor) WorkerKilledEnter(e *am.Event) bool
func (*Supervisor) WorkerKilledState ¶
func (s *Supervisor) WorkerKilledState(e *am.Event)
func (*Supervisor) Workers ¶ added in v0.10.0
func (s *Supervisor) Workers( ctx context.Context, state WorkerState, ) ([]*workerInfo, error)
Workers returns a list of workers in a desired state. If [ctx] expires, it will reutrn nil, nil.
type SupervisorOpts ¶
type Worker ¶
type Worker struct {
*am.ExceptionHandler
Mach *am.Machine
Name string
Kind string
// AcceptClient is the ID of a client, passed by the supervisor. Worker should
// only accept connections from this client.
AcceptClient string
// ConnTimeout is the time to wait for an outbound connection to be
// established.
ConnTimeout time.Duration
DeliveryTimeout time.Duration
// BootAddr is the address of the bootstrap machine.
BootAddr string
// BootRpc is the RPC client connection to bootstrap machine, which passes
// connection info to the Supervisor.
BootRpc *rpc.Client
// LocalAddr is the address of the local RPC server.
LocalAddr string
// LocalRpc is the local RPC server, used by the Supervisor to connect.
LocalRpc *rpc.Server
// PublicAddr is the address of the public RPC server.
PublicAddr string
// PublicRpc is the public RPC server, used by the Client to connect.
PublicRpc *rpc.Server
}
func NewWorker ¶
func NewWorker(ctx context.Context, kind string, workerStruct am.Schema, stateNames am.S, opts *WorkerOpts, ) (*Worker, error)
NewWorker initializes a new Worker instance and returns it, or an error if validation fails.
func (*Worker) ErrNetworkState ¶
func (*Worker) HealthcheckState ¶
func (*Worker) LocalRpcReadyState ¶
func (*Worker) PublicRpcReadyState ¶
func (*Worker) RpcReadyState ¶
func (*Worker) ServeClientState ¶
func (*Worker) StartState ¶
type WorkerOpts ¶
type WorkerState ¶ added in v0.10.0
type WorkerState string
states of a worker
const ( StateIniting WorkerState = "initing" StateRpc WorkerState = "rpc" StateIdle WorkerState = "idle" StateBusy WorkerState = "busy" StateReady WorkerState = "ready" )