worker

package
v0.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 38 Imported by: 2

Documentation

Overview

Package worker contains code which executes a task.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DownloadInputs

func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter, parallelLimit int) error

DownloadInputs downloads the given inputs.

func FlattenInputs

func FlattenInputs(ctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter) ([]*tes.Input, error)

FlattenInputs flattens any directory inputs into a list of file inputs. A warning event will be generated if an input directory is empty.

func FlattenOutputs

func FlattenOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter) ([]*tes.Output, error)

FlattenOutputs flattens output directories into a list of files. A warning event will be generated if an output directory is empty.

func UploadOutputs

func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter, parallelLimit int) ([]*tes.OutputFileLog, error)

UploadOutputs uploads the outputs.

Types

type Base64TaskReader

type Base64TaskReader struct {
	// contains filtered or unexported fields
}

Base64TaskReader reads a task from a base64 encoded string.

func NewBase64TaskReader

func NewBase64TaskReader(raw string) (*Base64TaskReader, error)

NewBase64TaskReader creates a new Base64TaskReader.

func (*Base64TaskReader) Close

func (f *Base64TaskReader) Close()

Close the Base64TaskReader

func (*Base64TaskReader) State

func (f *Base64TaskReader) State(ctx context.Context) (tes.State, error)

State returns the task state. Due to some quirks in the implementation of this reader, and since there is no online database to connect to, this will always return QUEUED.

func (*Base64TaskReader) Task

func (f *Base64TaskReader) Task(ctx context.Context) (*tes.Task, error)

Task returns the task. A random ID will be generated.

type Command

type Command struct {
	Image        string
	ShellCommand []string
	Volumes      []Volume
	Workdir      string
	Env          map[string]string
	Stdin        io.Reader
	Stdout       io.Writer
	Stderr       io.Writer
	Event        *events.ExecutorWriter
	TaskCommand
}

func (*Command) GetStderr

func (c *Command) GetStderr() io.Writer

func (*Command) GetStdout

func (c *Command) GetStdout() io.Writer

func (*Command) SetStderr

func (c *Command) SetStderr(w io.Writer)

func (*Command) SetStdin

func (c *Command) SetStdin(r io.Reader)

func (*Command) SetStdout

func (c *Command) SetStdout(w io.Writer)

type DefaultWorker

type DefaultWorker struct {
	Executor    Executor
	Conf        *config.Worker
	Store       storage.Storage
	TaskReader  TaskReader
	EventWriter events.Writer
	Command
}

DefaultWorker is the default task worker, which follows a basic, sequential process of task initialization, execution, finalization, and logging.

func (*DefaultWorker) Close

func (r *DefaultWorker) Close()

func (*DefaultWorker) Run

func (r *DefaultWorker) Run(pctx context.Context) (runerr error)

Run runs the Worker.

type DockerCommand

type DockerCommand struct {
	Id              string
	Name            string
	Volumes         []Volume
	Workdir         string
	RemoveContainer bool
	Event           *events.ExecutorWriter
	DriverCommand   string
	RunCommand      string // template string
	PullCommand     string // template string
	StopCommand     string // template string
	EnableTags      bool
	Tags            map[string]string
	Command
}

DockerCommand is responsible for configuring and running a docker container.

func (DockerCommand) GetImage

func (docker DockerCommand) GetImage() string

func (*DockerCommand) GetStderr

func (docker *DockerCommand) GetStderr() io.Writer

func (*DockerCommand) GetStdout

func (docker *DockerCommand) GetStdout() io.Writer

func (*DockerCommand) InspectContainer

func (docker *DockerCommand) InspectContainer(ctx context.Context) DockerCommand

inspectContainer inspects the docker container for metadata.

func (DockerCommand) Run

func (docker DockerCommand) Run(ctx context.Context) error

Run runs the Docker command and blocks until done.

func (DockerCommand) Stop

func (docker DockerCommand) Stop() error

Stop stops the container.

func (*DockerCommand) SyncAPIVersion

func (docker *DockerCommand) SyncAPIVersion() error

SyncDockerAPIVersion ensures that the client uses the same API version as the server.

type DockerVersion

type DockerVersion struct {
	Client string
	Server string
}

type Executor

type Executor struct {
	// "docker" or "kubernetes"
	Backend string
	// Kubernetes executor template
	Template string
	// Kubernetes persistent volume template
	PVTemplate string
	// Kubernetes persistent volume claim template
	PVCTemplate string
	// Funnel Server namespace
	Namespace string
	// Funnel Worker + Executor namespace
	JobsNamespace string
	// Kubernetes service account name
	ServiceAccount string
	// Kubernetes service account template
	ServiceAccountTemplate string
	// Kubernetes role template
	RoleTemplate string
	// Kubernetes role binding template
	RoleBindingTemplate string
	// NodeSelector for scheduling jobs onto specific nodes
	NodeSelector map[string]string
	// Tolerations for scheduling jobs onto specific nodes
	Tolerations []map[string]interface{}
	// Resources specifies the default resource requirements for Kubernetes jobs.
	Resources *config.KubernetesResources
}

Configuration of the task executor.

type FileMapper

type FileMapper struct {
	Volumes    []Volume
	Inputs     []*tes.Input
	Outputs    []*tes.Output
	WorkDir    string
	ScratchDir string
}

FileMapper is responsible for mapping paths into a working directory on the worker's host file system.

Every task needs it's own directory to work in. When a file is downloaded for a task, it needs to be stored in the task's working directory. Similar for task outputs, uploads, stdin/out/err, etc. FileMapper helps the worker engine manage all these paths.

func NewFileMapper

func NewFileMapper(dir string) *FileMapper

NewFileMapper returns a new FileMapper, which maps files into the given base directory.

func (*FileMapper) AddInput

func (mapper *FileMapper) AddInput(input *tes.Input) error

AddInput adds an input to the mapped files for the given tes.Input. A copy of the tes.Input will be added to mapper.Inputs, with the "Path" field updated to the mapped host path.

If the path can't be mapped an error is returned.

func (*FileMapper) AddOutput

func (mapper *FileMapper) AddOutput(output *tes.Output) error

AddOutput adds an output to the mapped files for the given tes.Output. A copy of the tes.Output will be added to mapper.Outputs, with the "Path" field updated to the mapped host path.

If the path can't be mapped, an error is returned.

func (*FileMapper) AddTmpVolume

func (mapper *FileMapper) AddTmpVolume(mountPoint string) error

AddTmpVolume creates a directory on the host based on the declared path in the container and adds it to mapper.Volumes.

If the path can't be mapped, an error is returned.

func (*FileMapper) AddVolume

func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error

AddVolume adds a mapped volume to the mapper. A corresponding Volume record is added to mapper.Volumes.

If the volume paths are invalid or can't be mapped, an error is returned.

func (*FileMapper) Cleanup

func (mapper *FileMapper) Cleanup() error

Cleanup deletes the working directory.

func (*FileMapper) ContainerPath

func (mapper *FileMapper) ContainerPath(src string) string

ContainerPath returns an unmapped path.

The mapper's base dir is stripped from the path. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.ContainerPath("/tmp/mapped_files/home/ubuntu/myfile") will return "/home/ubuntu/myfile".

func (*FileMapper) CopyInputsToScratch

func (mapper *FileMapper) CopyInputsToScratch(scratchDir string) error

func (*FileMapper) CopyOutputsToWorkDir

func (mapper *FileMapper) CopyOutputsToWorkDir(scratchDir string) error

func (*FileMapper) CreateHostFile

func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)

CreateHostFile creates a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.

This function calls os.Create

If the path can't be mapped or the file can't be created, an error is returned.

func (*FileMapper) HostPath

func (mapper *FileMapper) HostPath(src string) (string, error)

HostPath returns a mapped path.

The path is concatenated to the mapper's base dir. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.HostPath("/home/ubuntu/myfile") will return "/tmp/mapped_files/home/ubuntu/myfile".

The mapped path is required to be a subpath of the mapper's base directory. e.g. mapper.HostPath("../../foo") should fail with an error.

func (*FileMapper) HostScratchPath

func (mapper *FileMapper) HostScratchPath(src string) (string, error)

func (*FileMapper) IsSubpath

func (mapper *FileMapper) IsSubpath(p string, base string) bool

IsSubpath returns true if the given path "p" is a subpath of "base".

func (*FileMapper) MapTask

func (mapper *FileMapper) MapTask(task *tes.Task) error

MapTask adds all the volumes, inputs, and outputs in the given Task to the FileMapper.

func (*FileMapper) OpenHostFile

func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)

OpenHostFile opens a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.

This function calls os.Open

If the path can't be mapped or the file can't be opened, an error is returned.

type FileTaskReader

type FileTaskReader struct {
	// contains filtered or unexported fields
}

FileTaskReader provides a TaskReader implementation from a task file.

func NewFileTaskReader

func NewFileTaskReader(path string) (*FileTaskReader, error)

NewFileTaskReader creates a new FileTaskReader.

func (*FileTaskReader) Close

func (f *FileTaskReader) Close()

Close the FileTaskReader

func (*FileTaskReader) State

func (f *FileTaskReader) State(ctx context.Context) (tes.State, error)

State returns the task state. Due to some quirks in the implementation of this reader, and since there is no online database to connect to, this will always return QUEUED.

func (*FileTaskReader) Task

func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error)

Task returns the task. A random ID will be generated.

type GenericTaskReader

type GenericTaskReader struct {
	// contains filtered or unexported fields
}

GenericTaskReader provides read access to tasks.

func NewGenericTaskReader

func NewGenericTaskReader(get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error), taskID string, close func()) *GenericTaskReader

NewGenericTaskReader returns a new generic task reader.

func (*GenericTaskReader) Close

func (r *GenericTaskReader) Close()

func (*GenericTaskReader) State

func (r *GenericTaskReader) State(ctx context.Context) (tes.State, error)

State returns the current state of the task.

func (*GenericTaskReader) Task

func (r *GenericTaskReader) Task(ctx context.Context) (*tes.Task, error)

Task returns the task descriptor.

type K8sExecutorErr added in v0.11.8

type K8sExecutorErr struct {
	ExitCode int
	Reason   string
	Message  string
	JobName  string
}

func (*K8sExecutorErr) Error added in v0.11.8

func (e *K8sExecutorErr) Error() string

type K8sSystemErr added in v0.11.8

type K8sSystemErr struct {
	Reason  string
	Message string
	Err     error
	// contains filtered or unexported fields
}

func (*K8sSystemErr) Error added in v0.11.8

func (e *K8sSystemErr) Error() string

func (*K8sSystemErr) Unwrap added in v0.11.8

func (e *K8sSystemErr) Unwrap() error

type KubernetesCommand

type KubernetesCommand struct {
	TaskId         string
	JobId          int
	StdinFile      string
	TaskTemplate   string
	Namespace      string // Funnel Server Namespace
	JobsNamespace  string // Funnel Worker + Executor Namespace (default: Namespace)
	NodeSelector   map[string]string
	Tolerations    []map[string]interface{}
	Resources      *tes.Resources
	ResourceLimits *tes.Resources
	ServiceAccount string
	NeedsPVC       bool
	Clientset      kubernetes.Interface
	Command
}

KubernetesCommand is responsible for configuring and running a task in a Kubernetes cluster.

func (KubernetesCommand) GetStderr

func (kcmd KubernetesCommand) GetStderr() io.Writer

func (KubernetesCommand) GetStdout

func (kcmd KubernetesCommand) GetStdout() io.Writer

func (KubernetesCommand) Run

func (kcmd KubernetesCommand) Run(ctx context.Context) error

Create the Executor K8s job from kubernetes-executor-template.yaml Funnel Worker job is created in compute/kubernetes/backend.go#CreateResources

func (KubernetesCommand) Stop

func (kcmd KubernetesCommand) Stop() error

Deletes the job running the task.

type RPCTaskReader

type RPCTaskReader struct {
	// contains filtered or unexported fields
}

RPCTaskReader provides read access to tasks from the funnel server over gRPC.

func NewRPCTaskReader

func NewRPCTaskReader(ctx context.Context, conf *config.RPCClient, taskID string) (*RPCTaskReader, error)

NewRPCTaskReader returns a new RPC-based task reader.

func (*RPCTaskReader) Close

func (r *RPCTaskReader) Close()

Close closes the connection.

func (*RPCTaskReader) State

func (r *RPCTaskReader) State(ctx context.Context) (tes.State, error)

State returns the current state of the task.

func (*RPCTaskReader) Task

func (r *RPCTaskReader) Task(ctx context.Context) (*tes.Task, error)

Task returns the task descriptor.

type TaskCommand

type TaskCommand interface {
	Run(context.Context) error
	Stop() error
	GetStdout() io.Writer
	GetStderr() io.Writer
	SetStdout(io.Writer)
	SetStderr(io.Writer)
	SetStdin(io.Reader)
}

type TaskReader

type TaskReader interface {
	Task(ctx context.Context) (*tes.Task, error)
	State(ctx context.Context) (tes.State, error)
	Close()
}

TaskReader is a type which reads task information during task execution.

type Volume

type Volume struct {
	// The path in tes worker.
	HostPath string
	// The path in Docker.
	ContainerPath string
	Readonly      bool
}

Volume represents a volume mounted into a docker container. This includes a HostPath, the path on the host file system, and a ContainerPath, the path on the container file system, and whether the volume is read-only.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL