Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConcurrentExecutor ¶
type ConcurrentExecutor interface {
// Run runs the provide ServiceNodeFn concurrently
Run() error
}
ConcurrentExecutor executes functions on a collection of service nodes concurrently
func NewConcurrentExecutor ¶
func NewConcurrentExecutor( nodes []ServiceNode, concurrency int, timeout time.Duration, fn ServiceNodeFn, ) ConcurrentExecutor
NewConcurrentExecutor returns a new concurrent executor
type Configuration ¶
type Configuration struct {
OperationTimeout *time.Duration `yaml:"operationTimeout"`
TransferBufferSize *int `yaml:"transferBufferSize"`
Retry *xretry.Configuration `yaml:"retry"`
Heartbeat *HeartbeatConfiguration `yaml:"heartbeat"`
}
Configuration is a YAML wrapper around Options
func (*Configuration) Options ¶
func (c *Configuration) Options(iopts instrument.Options) Options
Options returns `Options` corresponding to the provided struct values
type HeartbeatConfiguration ¶
type HeartbeatConfiguration struct {
Enabled *bool `yaml:"enabled"`
Timeout *time.Duration `yaml:"timeout"`
Interval *time.Duration `yaml:"interval"`
CheckInterval *time.Duration `yaml:"checkInterval"`
}
HeartbeatConfiguration is a YAML compatible wrapper around HeartbeatOptions
func (*HeartbeatConfiguration) Options ¶
func (h *HeartbeatConfiguration) Options() HeartbeatOptions
Options returns HeartbeatOptions corresponding to the values in the HeartbeatConfiguration
type HeartbeatOptions ¶
type HeartbeatOptions interface {
// Validate validates the HeartbeatOptions
Validate() error
// SetEnabled sets whether the Heartbeating is enabled
SetEnabled(bool) HeartbeatOptions
// Enabled returns whether the Heartbeating is enabled
Enabled() bool
// SetNowFn sets the NowFn
SetNowFn(xclock.NowFn) HeartbeatOptions
// NowFn returns the NowFn
NowFn() xclock.NowFn
// SetInterval sets the heartbeating interval
SetInterval(time.Duration) HeartbeatOptions
// Interval returns the heartbeating interval
Interval() time.Duration
// SetCheckInterval sets the frequency with which heartbeating timeouts
// are checked
SetCheckInterval(time.Duration) HeartbeatOptions
// CheckInterval returns the frequency with which heartbeating timeouts
// are checked
CheckInterval() time.Duration
// SetTimeout sets the heartbeat timeout duration, i.e. the window of
// time after which missing heartbeats are considered errorneous
SetTimeout(time.Duration) HeartbeatOptions
// Timeout returns the heartbeat timeout duration, i.e. the window of
// time after which missing heartbeats are considered errorneous
Timeout() time.Duration
// SetHeartbeatRouter sets the heartbeat router to be used
SetHeartbeatRouter(HeartbeatRouter) HeartbeatOptions
// HeartbeatRouter returns the heartbeat router in use
HeartbeatRouter() HeartbeatRouter
}
HeartbeatOptions are the knobs to control heartbeating behavior
func NewHeartbeatOptions ¶
func NewHeartbeatOptions() HeartbeatOptions
NewHeartbeatOptions returns the default HeartbeatOptions
type HeartbeatRouter ¶
type HeartbeatRouter interface {
hb.HeartbeaterServer
// Endpoint returns the router endpoint
Endpoint() string
// Register registers the specified server under the given id
Register(string, hb.HeartbeaterServer) error
// Deregister un-registers any server registered under the given id
Deregister(string) error
}
HeartbeatRouter routes heartbeats based on registered servers
func NewHeartbeatRouter ¶
func NewHeartbeatRouter(endpoint string) HeartbeatRouter
NewHeartbeatRouter returns a new heartbeat router
type Listener ¶
type Listener interface {
// OnProcessTerminate is invoked when the remote process being run terminates
OnProcessTerminate(node ServiceNode, desc string)
// OnHeartbeatTimeout is invoked upon remote heartbeats having timed-out
OnHeartbeatTimeout(node ServiceNode, lastHeartbeatTs time.Time)
// OnOverwrite is invoked if remote agent control is overwritten by another
// coordinator
OnOverwrite(node ServiceNode, desc string)
}
Listener provides callbacks invoked upon remote process state transitions
func NewListener ¶
func NewListener( onProcessTerminate func(ServiceNode, string), onHeartbeatTimeout func(ServiceNode, time.Time), onOverwrite func(ServiceNode, string), ) Listener
NewListener creates a new listener
type OperatorClientFn ¶
type OperatorClientFn func() (*grpc.ClientConn, m3em.OperatorClient, error)
OperatorClientFn returns a function able to construct connections to remote Operators
type Options ¶
type Options interface {
// Validate validates the NodeOptions
Validate() error
// SetInstrumentOptions sets the instrumentation options
SetInstrumentOptions(instrument.Options) Options
// InstrumentOptions returns the instrumentation options
InstrumentOptions() instrument.Options
// SetOperationTimeout returns the timeout for node operations
SetOperationTimeout(time.Duration) Options
// OperationTimeout returns the timeout for node operations
OperationTimeout() time.Duration
// SetRetrier sets the retrier for node operations
SetRetrier(xretry.Retrier) Options
// OperationRetrier returns the retrier for node operations
Retrier() xretry.Retrier
// SetTransferBufferSize sets the bytes buffer size used during file transfer
SetTransferBufferSize(int) Options
// TransferBufferSize returns the bytes buffer size used during file transfer
TransferBufferSize() int
// SetMaxPullSize sets the max bytes retrieved from remote agents when
// fetching output files
SetMaxPullSize(int64) Options
// MaxPullSize returns the max bytes retrieved from remote agents when
// fetching output files
MaxPullSize() int64
// SetHeartbeatOptions sets the HeartbeatOptions
SetHeartbeatOptions(HeartbeatOptions) Options
// HeartbeatOptions returns the HeartbeatOptions
HeartbeatOptions() HeartbeatOptions
// SetOperatorClientFn sets the OperatorClientFn
SetOperatorClientFn(OperatorClientFn) Options
// OperatorClientFn returns the OperatorClientFn
OperatorClientFn() OperatorClientFn
}
Options are the various knobs to control Node behavior
func NewOptions ¶
func NewOptions( opts instrument.Options, ) Options
NewOptions returns a new Options construct.
type RemoteOutputType ¶
type RemoteOutputType int
RemoteOutputType describes the various outputs available on the remote agents
const ( // RemoteProcessStdout refers to the remote process stdout RemoteProcessStdout RemoteOutputType = iota // RemoteProcessStderr refers to the remote process stderr RemoteProcessStderr )
type ServiceNode ¶
type ServiceNode interface {
placement.Instance
// Setup initializes the directories, config file, and binary for the process being tested.
// It does not Start the process on the ServiceNode.
Setup(
build build.ServiceBuild,
config build.ServiceConfiguration,
token string,
force bool,
) error
// Start starts the service process for this ServiceNode.
Start() error
// Stop stops the service process for this ServiceNode.
Stop() error
// Status returns the ServiceNode status.
Status() Status
// Teardown releases any remote resources used for testing.
Teardown() error
// Close releases any locally held resources
Close() error
// RegisterListener registers an event listener
RegisterListener(Listener) ListenerID
// DeregisterListener un-registers an event listener
DeregisterListener(ListenerID)
// TransferLocalFile transfers a local file to the specified destination paths
// NB: destPaths are not allowed to use relative path specifiers, i.e. '..' is illegal;
// the eventual destination path on remote hosts is relative to the working directory
// of the remote agent.
// e.g. if the remote agent has working directory /var/m3em-agent, and we make the call:
// svcNode.TransferLocalFile("some/local/file/path/id", []string{"path/id", "another/path/id"})
//
// upon success, there will be two new files under the remote agent working directory:
// /var/m3em-agent/
// /var/m3em-agent/path/id <-- same contents as "some/local/file/path/id"
// /var/m3em-agent/another/path/id <-- same contents as "some/local/file/path/id"
TransferLocalFile(localSrc string, destPaths []string, overwrite bool) error
// GetRemoteOutput transfers the specified remote file to the specified path
GetRemoteOutput(t RemoteOutputType, localDest string) (truncated bool, err error)
}
ServiceNode represents an executable service node. This object controls both the service and resources on the host running the service (e.g. fs, processes, etc.)
type ServiceNodeFn ¶
type ServiceNodeFn func(ServiceNode) error
ServiceNodeFn performs an operation on a given ServiceNode
type Status ¶
type Status int
Status indicates the different states a ServiceNode can be in. The state diagram below describes the transitions between the various states:
┌──────────────────┐
│ │
┌Teardown()─────│ Error │
│ │ │
│ └──────────────────┘
│
▼
┌──────────────────┐ ┌───────────────-──┐ │ │ Setup() │ │ │ Uninitialized ├────────────────────────▶│ Setup │◀─┐ │ │◀───────────┐ │ │ │ └──────────────────┘ Teardown()└────────────└──────────────────┘ │
▲ │ │
│ │ │
│ │ │
│ Start() │ │
│ ┌─────────────┘ │
│ │ │
│ │ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│Teardown() │ │ |
└────────────────────│ Running │────────────Stop()
│ │
└──────────────────┘
const ( // StatusUninitialized refers to the state of an un-initialized node. StatusUninitialized Status = iota // StatusSetup is the state of a node which has been Setup() StatusSetup // StatusRunning is the state of a node which has been Start()-ed StatusRunning // StatusError is the state of a node which is in an Error state StatusError )