Documentation
¶
Overview ¶
This package provides compiler services for flyte workflows. It performs static analysis on the Workflow and produces CompilerErrors for any detected issue. A flyte workflow should only be considered valid for execution if it passed through the compiler first. The intended usage for the compiler is as follows: 1) Call GetRequirements(...) and load/retrieve all tasks/workflows referenced in the response. 2) Call CompileWorkflow(...) and make sure it reports no errors. 3) Use one of the transformer packages (e.g. transformer/k8s) to build the final executable workflow.
+-------------------+
| start(StartNode) |
+-------------------+
|
| wf_input
v
+--------+ +-------------------+
| static | --> | node_1(TaskNode) |
+--------+ +-------------------+
| |
| | x
| v
| +-------------------+
+----------> | node_2(TaskNode) |
+-------------------+
|
| n2_output
v
+-------------------+
| end(EndNode) |
+-------------------+
+-------------------+
| Workflow Id: repo |
+-------------------+
Index ¶
- func CompileTask(task *core.TaskTemplate) (*core.CompiledTask, error)
- func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.WorkflowTemplate, ...) (*core.CompiledWorkflowClosure, error)
- type LaunchPlanInterfaceProvider
- type LaunchPlanRefIdentifier
- type TaskIdentifier
- type WorkflowExecutionRequirements
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CompileTask ¶
func CompileTask(task *core.TaskTemplate) (*core.CompiledTask, error)
Task compiler compiles a given Task into an executable Task. It validates all required parameters and ensures a Task is well-formed.
func CompileWorkflow ¶
func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.WorkflowTemplate, tasks []*core.CompiledTask, launchPlans []c.InterfaceProvider) (*core.CompiledWorkflowClosure, error)
Compiles a flyte workflow a and all of its dependencies into a single executable Workflow. Refer to GetRequirements() to obtain a list of launchplan and Task ids to load/compile first. Returns an executable Workflow (if no errors are found) or a list of errors that must be addressed before the Workflow can be executed. Cast the error to errors.CompileErrors to inspect individual errors.
Example (Basic) ¶
inputWorkflow := &core.WorkflowTemplate{
Id: &core.Identifier{Name: "repo"},
Interface: &core.TypedInterface{
Inputs: createEmptyVariableMap(),
Outputs: createEmptyVariableMap(),
},
Nodes: []*core.Node{
{
Id: "FirstNode",
Target: &core.Node_TaskNode{
TaskNode: &core.TaskNode{
Reference: &core.TaskNode_ReferenceId{
ReferenceId: &core.Identifier{Name: "task_123"},
},
},
},
},
},
}
// Detect what other workflows/tasks does this coreWorkflow reference
subWorkflows := make([]*core.WorkflowTemplate, 0)
reqs, err := GetRequirements(inputWorkflow, subWorkflows)
if err != nil {
fmt.Printf("failed to get requirements. Error: %v", err)
return
}
fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n",
strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","),
strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ","))
// Replace with logic to satisfy the requirements
workflows := make([]common.InterfaceProvider, 0)
tasks := []*core.TaskTemplate{
{
Id: &core.Identifier{Name: "task_123"},
Interface: &core.TypedInterface{
Inputs: createEmptyVariableMap(),
Outputs: createEmptyVariableMap(),
},
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "image://",
Command: []string{"cmd"},
Args: []string{"args"},
},
},
},
}
compiledTasks := make([]*core.CompiledTask, 0, len(tasks))
for _, task := range tasks {
compiledTask, err := CompileTask(task)
if err != nil {
fmt.Printf("failed to compile task [%v]. Error: %v", task.Id, err)
return
}
compiledTasks = append(compiledTasks, compiledTask)
}
output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, workflows)
fmt.Printf("Compiled Workflow in GraphViz: %v\n", visualize.ToGraphViz(output.Primary))
fmt.Printf("Compile Errors: %v\n", errs)
Output: Needed Tasks: [task_123], Needed Workflows [] Compiled Workflow in GraphViz: digraph G {rankdir=TB;workflow[label="Workflow Id: name:"repo" "];node[style=filled];"start-node(start)" [shape=Msquare];"start-node(start)" -> "FirstNode()" [label="execution",style="dashed"];"FirstNode()" -> "end-node(end)" [label="execution",style="dashed"];} Compile Errors: <nil>
Example (CompileErrors) ¶
inputWorkflow := &core.WorkflowTemplate{
Id: &core.Identifier{Name: "repo"},
Interface: &core.TypedInterface{
Inputs: createEmptyVariableMap(),
Outputs: createEmptyVariableMap(),
},
Nodes: []*core.Node{
{
Target: &core.Node_TaskNode{
TaskNode: &core.TaskNode{
Reference: &core.TaskNode_ReferenceId{
ReferenceId: &core.Identifier{Name: "task_123"},
},
},
},
},
},
}
// Detect what other workflows/tasks does this coreWorkflow reference
subWorkflows := make([]*core.WorkflowTemplate, 0)
reqs, err := GetRequirements(inputWorkflow, subWorkflows)
if err != nil {
fmt.Printf("Failed to get requirements. Error: %v", err)
return
}
fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n",
strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","),
strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ","))
// Replace with logic to satisfy the requirements
workflows := make([]common.InterfaceProvider, 0)
_, errs := CompileWorkflow(inputWorkflow, subWorkflows, []*core.CompiledTask{}, workflows)
fmt.Printf("Compile Errors: %v\n", errs)
Output: Needed Tasks: [task_123], Needed Workflows [] Compile Errors: Collected Errors: 1 Error 0: Code: TaskReferenceNotFound, Node Id: start-node, Description: Referenced Task [name:"task_123" ] not found.
Example (InputsOutputsBinding) ¶
inputWorkflow := &core.WorkflowTemplate{
Id: &core.Identifier{Name: "repo"},
Interface: &core.TypedInterface{
Inputs: createVariableMap(map[string]*core.Variable{
"wf_input": {
Type: getIntegerLiteralType(),
},
}),
Outputs: createVariableMap(map[string]*core.Variable{
"wf_output": {
Type: getIntegerLiteralType(),
},
}),
},
Nodes: []*core.Node{
{
Id: "node_1",
Target: &core.Node_TaskNode{
TaskNode: &core.TaskNode{Reference: &core.TaskNode_ReferenceId{ReferenceId: &core.Identifier{Name: "task_123"}}},
},
Inputs: []*core.Binding{
newVarBinding("", "wf_input", "x"), newIntegerBinding(124, "y"),
},
},
{
Id: "node_2",
Target: &core.Node_TaskNode{
TaskNode: &core.TaskNode{Reference: &core.TaskNode_ReferenceId{ReferenceId: &core.Identifier{Name: "task_123"}}},
},
Inputs: []*core.Binding{
newIntegerBinding(124, "y"), newVarBinding("node_1", "x", "x"),
},
OutputAliases: []*core.Alias{{Var: "x", Alias: "n2_output"}},
},
},
Outputs: []*core.Binding{newVarBinding("node_2", "n2_output", "wf_output")},
}
// Detect what other graphs/tasks does this coreWorkflow reference
subWorkflows := make([]*core.WorkflowTemplate, 0)
reqs, err := GetRequirements(inputWorkflow, subWorkflows)
if err != nil {
fmt.Printf("Failed to get requirements. Error: %v", err)
return
}
fmt.Printf("Needed Tasks: [%v], Needed Graphs [%v]\n",
strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","),
strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ","))
// Replace with logic to satisfy the requirements
graphs := make([]common.InterfaceProvider, 0)
inputTasks := []*core.TaskTemplate{
{
Id: &core.Identifier{Name: "task_123"},
Metadata: &core.TaskMetadata{},
Interface: &core.TypedInterface{
Inputs: createVariableMap(map[string]*core.Variable{
"x": {
Type: getIntegerLiteralType(),
},
"y": {
Type: getIntegerLiteralType(),
},
}),
Outputs: createVariableMap(map[string]*core.Variable{
"x": {
Type: getIntegerLiteralType(),
},
}),
},
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "image://",
Command: []string{"cmd"},
Args: []string{"args"},
},
},
},
}
// Compile all tasks before proceeding with Workflow
compiledTasks := make([]*core.CompiledTask, 0, len(inputTasks))
for _, task := range inputTasks {
compiledTask, err := CompileTask(task)
if err != nil {
fmt.Printf("Failed to compile task [%v]. Error: %v", task.Id, err)
return
}
compiledTasks = append(compiledTasks, compiledTask)
}
output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, graphs)
if errs != nil {
fmt.Printf("Compile Errors: %v\n", errs)
} else {
fmt.Printf("Compiled Workflow in GraphViz: %v\n", visualize.ToGraphViz(output.Primary))
}
Output: Needed Tasks: [task_123], Needed Graphs [] Compiled Workflow in GraphViz: digraph G {rankdir=TB;workflow[label="Workflow Id: name:"repo" "];node[style=filled];"start-node(start)" [shape=Msquare];"start-node(start)" -> "node_1()" [label="wf_input",style="solid"];"node_1()" -> "node_2()" [label="x",style="solid"];"static" -> "node_1()" [label=""];"node_2()" -> "end-node(end)" [label="n2_output",style="solid"];"static" -> "node_2()" [label=""];}
Types ¶
type LaunchPlanInterfaceProvider ¶ added in v0.2.25
type LaunchPlanInterfaceProvider struct {
// contains filtered or unexported fields
}
This object is meant to satisfy github.com/lyft/flytepropeller/pkg/compiler/common.InterfaceProvider This file is pretty much copied from Admin, (sorry for the link, a real link made go mod import admin) github-dot-com/lyft/flyteadmin/blob/1acce744b8c7839ab77a0eb1ed922905af15baa5/pkg/workflowengine/impl/interface_provider.go but that implementation relies on the internal Admin Gorm model. We should consider deprecating that one in favor of this one as Admin already has a dependency on the Propeller compiler.
func NewLaunchPlanInterfaceProvider ¶ added in v0.2.25
func NewLaunchPlanInterfaceProvider(launchPlan admin.LaunchPlan) *LaunchPlanInterfaceProvider
func (*LaunchPlanInterfaceProvider) GetExpectedInputs ¶ added in v0.2.25
func (p *LaunchPlanInterfaceProvider) GetExpectedInputs() *core.ParameterMap
func (*LaunchPlanInterfaceProvider) GetExpectedOutputs ¶ added in v0.2.25
func (p *LaunchPlanInterfaceProvider) GetExpectedOutputs() *core.VariableMap
func (*LaunchPlanInterfaceProvider) GetID ¶ added in v0.2.25
func (p *LaunchPlanInterfaceProvider) GetID() *core.Identifier
type LaunchPlanRefIdentifier ¶
type LaunchPlanRefIdentifier = common.Identifier
type TaskIdentifier ¶
type TaskIdentifier = common.Identifier
type WorkflowExecutionRequirements ¶
type WorkflowExecutionRequirements struct {
// contains filtered or unexported fields
}
Represents the set of required resources for a given Workflow's execution. All of the resources should be loaded before hand and passed to the compiler.
func GetRequirements ¶
func GetRequirements(fg *core.WorkflowTemplate, subWfs []*core.WorkflowTemplate) (reqs WorkflowExecutionRequirements, err error)
Computes requirements for a given Workflow.
func (WorkflowExecutionRequirements) GetRequiredLaunchPlanIds ¶
func (g WorkflowExecutionRequirements) GetRequiredLaunchPlanIds() []LaunchPlanRefIdentifier
Gets a slice of required Workflow ids to load.
func (WorkflowExecutionRequirements) GetRequiredTaskIds ¶
func (g WorkflowExecutionRequirements) GetRequiredTaskIds() []TaskIdentifier
Gets a slice of required Task ids to load.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
This package defines the intermediate layer that the compiler builds and transformers accept.
|
This package defines the intermediate layer that the compiler builds and transformers accept. |
|
This package is a central repository of all compile errors that can be reported.
|
This package is a central repository of all compile errors that can be reported. |
|
transformers
|
|
|
k8s
This package converts the output of the compiler into a K8s resource for propeller to execute.
|
This package converts the output of the compiler into a K8s resource for propeller to execute. |
|
This package contains validators for all elements of the workflow spec (node, task, branch, interface, bindings...
|
This package contains validators for all elements of the workflow spec (node, task, branch, interface, bindings... |