executor

package
v0.0.0-...-d76f36f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package executor defines the Executor interface and provides implementations for running tasks via shell, Python, Docker, Spark, Hadoop, and HDFS.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DockerConfig

type DockerConfig struct {
	Host       string // Docker daemon host, defaults to unix socket
	APIVersion string // Docker API version
}

DockerConfig configures the Docker executor.

type DockerExecutor

type DockerExecutor struct {
	// contains filtered or unexported fields
}

DockerExecutor runs tasks in Docker containers.

func NewDockerExecutor

func NewDockerExecutor(cfg DockerConfig) *DockerExecutor

NewDockerExecutor creates a new Docker executor.

func (*DockerExecutor) Cancel

func (e *DockerExecutor) Cancel(ctx context.Context, taskID string) error

func (*DockerExecutor) Execute

func (e *DockerExecutor) Execute(ctx context.Context, task *dag.Task, params map[string]any) (*Result, error)

func (*DockerExecutor) Type

func (e *DockerExecutor) Type() string

func (*DockerExecutor) Validate

func (e *DockerExecutor) Validate(task *dag.Task) error

type Executor

type Executor interface {
	// Execute runs the task with the given parameters.
	Execute(ctx context.Context, task *dag.Task, params map[string]any) (*Result, error)

	// Cancel stops an in-progress task execution.
	Cancel(ctx context.Context, taskID string) error

	// Validate checks if the task configuration is valid for this executor.
	Validate(task *dag.Task) error

	// Type returns the executor type identifier.
	Type() string
}

Executor is the interface that all task executors must implement.

type HDFSConfig

type HDFSConfig struct {
	NameNodeURL string // WebHDFS namenode URL
	User        string // HDFS user
}

HDFSConfig configures the HDFS executor.

type HDFSExecutor

type HDFSExecutor struct {
	// contains filtered or unexported fields
}

HDFSExecutor performs HDFS file operations via the WebHDFS REST API.

func NewHDFSExecutor

func NewHDFSExecutor(cfg HDFSConfig) *HDFSExecutor

NewHDFSExecutor creates a new HDFS executor.

func (*HDFSExecutor) Cancel

func (e *HDFSExecutor) Cancel(_ context.Context, _ string) error

func (*HDFSExecutor) Execute

func (e *HDFSExecutor) Execute(ctx context.Context, task *dag.Task, params map[string]any) (*Result, error)

func (*HDFSExecutor) Type

func (e *HDFSExecutor) Type() string

func (*HDFSExecutor) Validate

func (e *HDFSExecutor) Validate(task *dag.Task) error

type HadoopConfig

type HadoopConfig struct {
	ResourceManagerURL string // YARN ResourceManager URL
	TimelineServerURL  string // YARN Timeline Server URL
	User               string // Hadoop user
}

HadoopConfig configures the Hadoop executor.

type HadoopExecutor

type HadoopExecutor struct {
	// contains filtered or unexported fields
}

HadoopExecutor submits and manages Hadoop MapReduce jobs via YARN REST API.

func NewHadoopExecutor

func NewHadoopExecutor(cfg HadoopConfig) *HadoopExecutor

NewHadoopExecutor creates a new Hadoop MapReduce executor.

func (*HadoopExecutor) Cancel

func (e *HadoopExecutor) Cancel(ctx context.Context, taskID string) error

func (*HadoopExecutor) Execute

func (e *HadoopExecutor) Execute(ctx context.Context, task *dag.Task, params map[string]any) (*Result, error)

func (*HadoopExecutor) Type

func (e *HadoopExecutor) Type() string

func (*HadoopExecutor) Validate

func (e *HadoopExecutor) Validate(task *dag.Task) error

type PythonConfig

type PythonConfig struct {
	PythonPath string // Path to python binary, defaults to "python3"
	VenvPath   string // Optional virtualenv path
}

PythonConfig configures the Python executor.

type PythonExecutor

type PythonExecutor struct {
	// contains filtered or unexported fields
}

PythonExecutor runs Python scripts as task execution.

func NewPythonExecutor

func NewPythonExecutor(cfg PythonConfig) *PythonExecutor

NewPythonExecutor creates a new Python executor.

func (*PythonExecutor) Cancel

func (e *PythonExecutor) Cancel(_ context.Context, taskID string) error

func (*PythonExecutor) Execute

func (e *PythonExecutor) Execute(ctx context.Context, task *dag.Task, params map[string]any) (*Result, error)

func (*PythonExecutor) Type

func (e *PythonExecutor) Type() string

func (*PythonExecutor) Validate

func (e *PythonExecutor) Validate(task *dag.Task) error

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry holds available executor implementations.

func DefaultRegistry

func DefaultRegistry() *Registry

DefaultRegistry creates a registry with all built-in executors.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new executor registry.

func (*Registry) Get

func (r *Registry) Get(executorType string) (Executor, error)

Get returns an executor by type name.

func (*Registry) Register

func (r *Registry) Register(executor Executor)

Register adds an executor to the registry.

func (*Registry) Types

func (r *Registry) Types() []string

Types returns all registered executor types.

type Result

type Result struct {
	Status    dag.Status         `json:"status"`
	Output    []byte             `json:"output"`
	Error     error              `json:"error,omitempty"`
	StartTime time.Time          `json:"start_time"`
	EndTime   time.Time          `json:"end_time"`
	Metrics   map[string]float64 `json:"metrics,omitempty"`
	ExitCode  int                `json:"exit_code"`
}

Result contains the outcome of task execution.

type ShellExecutor

type ShellExecutor struct {
	// contains filtered or unexported fields
}

ShellExecutor runs shell commands as task execution.

func NewShellExecutor

func NewShellExecutor() *ShellExecutor

NewShellExecutor creates a new shell executor.

func (*ShellExecutor) Cancel

func (e *ShellExecutor) Cancel(_ context.Context, taskID string) error

func (*ShellExecutor) Execute

func (e *ShellExecutor) Execute(ctx context.Context, task *dag.Task, params map[string]any) (*Result, error)

func (*ShellExecutor) Type

func (e *ShellExecutor) Type() string

func (*ShellExecutor) Validate

func (e *ShellExecutor) Validate(task *dag.Task) error

type SparkConfig

type SparkConfig struct {
	SparkHome   string // Path to SPARK_HOME
	SparkSubmit string // Path to spark-submit binary
	Master      string // Spark master URL
	ConnectHost string // Spark Connect server host
	ConnectPort int    // Spark Connect server port
	SparkUIURL  string // Spark UI URL for monitoring
}

SparkConfig configures the Spark executor.

type SparkExecutor

type SparkExecutor struct {
	// contains filtered or unexported fields
}

SparkExecutor submits and manages Spark jobs.

func NewSparkExecutor

func NewSparkExecutor(cfg SparkConfig) *SparkExecutor

NewSparkExecutor creates a new Spark executor.

func (*SparkExecutor) Cancel

func (e *SparkExecutor) Cancel(_ context.Context, taskID string) error

func (*SparkExecutor) Execute

func (e *SparkExecutor) Execute(ctx context.Context, task *dag.Task, params map[string]any) (*Result, error)

func (*SparkExecutor) Type

func (e *SparkExecutor) Type() string

func (*SparkExecutor) Validate

func (e *SparkExecutor) Validate(task *dag.Task) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL