Documentation
¶
Overview ¶
Package worker contains code which executes a task.
Index ¶
- func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Storage, ...) error
- func FlattenInputs(ctx context.Context, inputs []*tes.Input, store storage.Storage, ...) ([]*tes.Input, error)
- func FlattenOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ...) ([]*tes.Output, error)
- func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ...) ([]*tes.OutputFileLog, error)
- type Base64TaskReader
- type Command
- type DefaultWorker
- type DockerCommand
- func (docker DockerCommand) GetEnvArgs() string
- func (docker DockerCommand) GetImage() string
- func (docker *DockerCommand) GetStderr() io.Writer
- func (docker *DockerCommand) GetStdout() io.Writer
- func (docker *DockerCommand) InspectContainer(ctx context.Context) DockerCommand
- func (docker DockerCommand) Run(ctx context.Context) error
- func (docker DockerCommand) Stop() error
- func (docker *DockerCommand) SyncAPIVersion() error
- type DockerVersion
- type Executor
- type FileMapper
- func (mapper *FileMapper) AddInput(input *tes.Input) error
- func (mapper *FileMapper) AddOutput(output *tes.Output) error
- func (mapper *FileMapper) AddTmpVolume(mountPoint string) error
- func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error
- func (mapper *FileMapper) Cleanup() error
- func (mapper *FileMapper) ContainerPath(src string) string
- func (mapper *FileMapper) CopyInputsToScratch(scratchDir string) error
- func (mapper *FileMapper) CopyOutputsToWorkDir(scratchDir string) error
- func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)
- func (mapper *FileMapper) HostPath(src string) (string, error)
- func (mapper *FileMapper) HostScratchPath(src string) (string, error)
- func (mapper *FileMapper) IsSubpath(p string, base string) bool
- func (mapper *FileMapper) MapTask(task *tes.Task) error
- func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)
- type FileTaskReader
- type GenericTaskReader
- type K8sExecutorErr
- type K8sSystemErr
- type KubernetesCommand
- type RPCTaskReader
- type TaskCommand
- type TaskReader
- type Volume
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DownloadInputs ¶
func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter, parallelLimit int) error
DownloadInputs downloads the given inputs.
func FlattenInputs ¶
func FlattenInputs(ctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter) ([]*tes.Input, error)
FlattenInputs flattens any directory inputs into a list of file inputs. A warning event will be generated if an input directory is empty.
func FlattenOutputs ¶
func FlattenOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter) ([]*tes.Output, error)
FlattenOutputs flattens output directories into a list of files. A warning event will be generated if an output directory is empty.
func UploadOutputs ¶
func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter, parallelLimit int) ([]*tes.OutputFileLog, error)
UploadOutputs uploads the outputs.
Types ¶
type Base64TaskReader ¶
type Base64TaskReader struct {
// contains filtered or unexported fields
}
Base64TaskReader reads a task from a base64 encoded string.
func NewBase64TaskReader ¶
func NewBase64TaskReader(raw string) (*Base64TaskReader, error)
NewBase64TaskReader creates a new Base64TaskReader.
type Command ¶
type DefaultWorker ¶
type DefaultWorker struct {
Executor Executor
Conf *config.Worker
Store storage.Storage
TaskReader TaskReader
EventWriter events.Writer
Command
}
DefaultWorker is the default task worker, which follows a basic, sequential process of task initialization, execution, finalization, and logging.
func (*DefaultWorker) Close ¶
func (r *DefaultWorker) Close()
type DockerCommand ¶
type DockerCommand struct {
Id string
Name string
Volumes []Volume
Workdir string
RemoveContainer bool
Event *events.ExecutorWriter
DriverCommand string
RunCommand string // template string
PullCommand string // template string
StopCommand string // template string
EnableTags bool
Tags map[string]string
Command
}
DockerCommand is responsible for configuring and running a docker container.
func (DockerCommand) GetEnvArgs ¶ added in v0.11.9
func (docker DockerCommand) GetEnvArgs() string
func (DockerCommand) GetImage ¶
func (docker DockerCommand) GetImage() string
func (*DockerCommand) GetStderr ¶
func (docker *DockerCommand) GetStderr() io.Writer
func (*DockerCommand) GetStdout ¶
func (docker *DockerCommand) GetStdout() io.Writer
func (*DockerCommand) InspectContainer ¶
func (docker *DockerCommand) InspectContainer(ctx context.Context) DockerCommand
inspectContainer inspects the docker container for metadata.
func (DockerCommand) Run ¶
func (docker DockerCommand) Run(ctx context.Context) error
Run runs the Docker command and blocks until done.
func (*DockerCommand) SyncAPIVersion ¶
func (docker *DockerCommand) SyncAPIVersion() error
SyncDockerAPIVersion ensures that the client uses the same API version as the server.
type DockerVersion ¶
type Executor ¶
type Executor struct {
// "docker" or "kubernetes"
Backend string
// Kubernetes executor template
Template string
// Kubernetes persistent volume template
PVTemplate string
// Kubernetes persistent volume claim template
PVCTemplate string
// Funnel Server namespace
Namespace string
// Funnel Worker + Executor namespace
JobsNamespace string
// Kubernetes service account name
ServiceAccount string
// Kubernetes service account template
ServiceAccountTemplate string
// Kubernetes role template
RoleTemplate string
// Kubernetes role binding template
RoleBindingTemplate string
// NodeSelector for scheduling jobs onto specific nodes
NodeSelector map[string]string
// Tolerations for scheduling jobs onto specific nodes
Tolerations []map[string]interface{}
// Resources specifies the default resource requirements for Kubernetes jobs.
Resources *config.KubernetesResources
}
Configuration of the task executor.
type FileMapper ¶
type FileMapper struct {
Volumes []Volume
Inputs []*tes.Input
Outputs []*tes.Output
WorkDir string
ScratchDir string
}
FileMapper is responsible for mapping paths into a working directory on the worker's host file system.
Every task needs it's own directory to work in. When a file is downloaded for a task, it needs to be stored in the task's working directory. Similar for task outputs, uploads, stdin/out/err, etc. FileMapper helps the worker engine manage all these paths.
func NewFileMapper ¶
func NewFileMapper(dir string) *FileMapper
NewFileMapper returns a new FileMapper, which maps files into the given base directory.
func (*FileMapper) AddInput ¶
func (mapper *FileMapper) AddInput(input *tes.Input) error
AddInput adds an input to the mapped files for the given tes.Input. A copy of the tes.Input will be added to mapper.Inputs, with the "Path" field updated to the mapped host path.
If the path can't be mapped an error is returned.
func (*FileMapper) AddOutput ¶
func (mapper *FileMapper) AddOutput(output *tes.Output) error
AddOutput adds an output to the mapped files for the given tes.Output. A copy of the tes.Output will be added to mapper.Outputs, with the "Path" field updated to the mapped host path.
If the path can't be mapped, an error is returned.
func (*FileMapper) AddTmpVolume ¶
func (mapper *FileMapper) AddTmpVolume(mountPoint string) error
AddTmpVolume creates a directory on the host based on the declared path in the container and adds it to mapper.Volumes.
If the path can't be mapped, an error is returned.
func (*FileMapper) AddVolume ¶
func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error
AddVolume adds a mapped volume to the mapper. A corresponding Volume record is added to mapper.Volumes.
If the volume paths are invalid or can't be mapped, an error is returned.
func (*FileMapper) Cleanup ¶
func (mapper *FileMapper) Cleanup() error
Cleanup deletes the working directory.
func (*FileMapper) ContainerPath ¶
func (mapper *FileMapper) ContainerPath(src string) string
ContainerPath returns an unmapped path.
The mapper's base dir is stripped from the path. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.ContainerPath("/tmp/mapped_files/home/ubuntu/myfile") will return "/home/ubuntu/myfile".
func (*FileMapper) CopyInputsToScratch ¶
func (mapper *FileMapper) CopyInputsToScratch(scratchDir string) error
func (*FileMapper) CopyOutputsToWorkDir ¶
func (mapper *FileMapper) CopyOutputsToWorkDir(scratchDir string) error
func (*FileMapper) CreateHostFile ¶
func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)
CreateHostFile creates a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.
This function calls os.Create ¶
If the path can't be mapped or the file can't be created, an error is returned.
func (*FileMapper) HostPath ¶
func (mapper *FileMapper) HostPath(src string) (string, error)
HostPath returns a mapped path.
The path is concatenated to the mapper's base dir. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.HostPath("/home/ubuntu/myfile") will return "/tmp/mapped_files/home/ubuntu/myfile".
The mapped path is required to be a subpath of the mapper's base directory. e.g. mapper.HostPath("../../foo") should fail with an error.
func (*FileMapper) HostScratchPath ¶
func (mapper *FileMapper) HostScratchPath(src string) (string, error)
func (*FileMapper) IsSubpath ¶
func (mapper *FileMapper) IsSubpath(p string, base string) bool
IsSubpath returns true if the given path "p" is a subpath of "base".
func (*FileMapper) MapTask ¶
func (mapper *FileMapper) MapTask(task *tes.Task) error
MapTask adds all the volumes, inputs, and outputs in the given Task to the FileMapper.
func (*FileMapper) OpenHostFile ¶
func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)
OpenHostFile opens a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.
This function calls os.Open ¶
If the path can't be mapped or the file can't be opened, an error is returned.
type FileTaskReader ¶
type FileTaskReader struct {
// contains filtered or unexported fields
}
FileTaskReader provides a TaskReader implementation from a task file.
func NewFileTaskReader ¶
func NewFileTaskReader(path string) (*FileTaskReader, error)
NewFileTaskReader creates a new FileTaskReader.
type GenericTaskReader ¶
type GenericTaskReader struct {
// contains filtered or unexported fields
}
GenericTaskReader provides read access to tasks.
func NewGenericTaskReader ¶
func NewGenericTaskReader(get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error), taskID string, close func()) *GenericTaskReader
NewGenericTaskReader returns a new generic task reader.
func (*GenericTaskReader) Close ¶
func (r *GenericTaskReader) Close()
type K8sExecutorErr ¶ added in v0.11.8
func (*K8sExecutorErr) Error ¶ added in v0.11.8
func (e *K8sExecutorErr) Error() string
type K8sSystemErr ¶ added in v0.11.8
type K8sSystemErr struct {
Reason string
Message string
Err error
// contains filtered or unexported fields
}
func (*K8sSystemErr) Error ¶ added in v0.11.8
func (e *K8sSystemErr) Error() string
func (*K8sSystemErr) Unwrap ¶ added in v0.11.8
func (e *K8sSystemErr) Unwrap() error
type KubernetesCommand ¶
type KubernetesCommand struct {
TaskId string
JobId int
StdinFile string
TaskTemplate string
Namespace string // Funnel Server Namespace
JobsNamespace string // Funnel Worker + Executor Namespace (default: Namespace)
NodeSelector map[string]string
Tolerations []map[string]interface{}
Resources *tes.Resources
ResourceLimits *tes.Resources
ServiceAccount string
NeedsPVC bool
Clientset kubernetes.Interface
Command
}
KubernetesCommand is responsible for configuring and running a task in a Kubernetes cluster.
func (KubernetesCommand) GetStderr ¶
func (kcmd KubernetesCommand) GetStderr() io.Writer
func (KubernetesCommand) GetStdout ¶
func (kcmd KubernetesCommand) GetStdout() io.Writer
func (KubernetesCommand) Run ¶
func (kcmd KubernetesCommand) Run(ctx context.Context) error
Create the Executor K8s job from kubernetes-executor-template.yaml Funnel Worker job is created in compute/kubernetes/backend.go#CreateResources
func (KubernetesCommand) Stop ¶
func (kcmd KubernetesCommand) Stop() error
Deletes the job running the task.
type RPCTaskReader ¶
type RPCTaskReader struct {
// contains filtered or unexported fields
}
RPCTaskReader provides read access to tasks from the funnel server over gRPC.
func NewRPCTaskReader ¶
func NewRPCTaskReader(ctx context.Context, conf *config.RPCClient, taskID string) (*RPCTaskReader, error)
NewRPCTaskReader returns a new RPC-based task reader.
type TaskCommand ¶
type TaskReader ¶
type TaskReader interface {
Task(ctx context.Context) (*tes.Task, error)
State(ctx context.Context) (tes.State, error)
Close()
}
TaskReader is a type which reads task information during task execution.
type Volume ¶
type Volume struct {
// The path in tes worker.
HostPath string
// The path in Docker.
ContainerPath string
Readonly bool
}
Volume represents a volume mounted into a docker container. This includes a HostPath, the path on the host file system, and a ContainerPath, the path on the container file system, and whether the volume is read-only.