Documentation
¶
Index ¶
- Variables
- type CacheableNodeHandler
- type EventRecorder
- type HandlerFactory
- type Node
- type NodeExecutionContext
- type NodeExecutionContextBuilder
- type NodeExecutionMetadata
- type NodeExecutor
- type NodeHandler
- type NodePhase
- type NodeStateReader
- type NodeStateWriter
- type NodeStatus
- type SetupContext
- type TaskReader
Constants ¶
This section is empty.
Variables ¶
View Source
var NodeStatusComplete = NodeStatus{NodePhase: NodePhaseComplete}
View Source
var NodeStatusPending = NodeStatus{NodePhase: NodePhasePending}
View Source
var NodeStatusQueued = NodeStatus{NodePhase: NodePhaseQueued}
View Source
var NodeStatusRecovered = NodeStatus{NodePhase: NodePhaseRecovered}
View Source
var NodeStatusRunning = NodeStatus{NodePhase: NodePhaseRunning}
View Source
var NodeStatusSuccess = NodeStatus{NodePhase: NodePhaseSuccess}
View Source
var NodeStatusTimedOut = NodeStatus{NodePhase: NodePhaseTimedOut}
View Source
var NodeStatusUndefined = NodeStatus{NodePhase: NodePhaseUndefined}
Functions ¶
This section is empty.
Types ¶
type CacheableNodeHandler ¶ added in v1.1.117
type CacheableNodeHandler interface {
NodeHandler
// GetCatalogKey returns the unique key for the node represented by the NodeExecutionContext
GetCatalogKey(ctx context.Context, executionContext NodeExecutionContext) (catalog.Key, error)
// IsCacheable returns two booleans representing if the node represented by the
// NodeExecutionContext is cacheable and cache serializable respectively.
IsCacheable(ctx context.Context, executionContext NodeExecutionContext) (bool, bool, error)
}
CacheableNodeHandler is a node that supports caching
type EventRecorder ¶
type EventRecorder interface {
events.TaskEventRecorder
events.NodeEventRecorder
}
type HandlerFactory ¶
type HandlerFactory interface {
GetHandler(kind v1alpha1.NodeKind) (NodeHandler, error)
Setup(ctx context.Context, executor Node, setup SetupContext) error
}
type Node ¶
type Node interface {
// This method is used specifically to set inputs for start node. This is because start node does not retrieve inputs
// from predecessors, but the inputs are inputs to the workflow or inputs to the parent container (workflow) node.
SetInputsForStartNode(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructureWithStartNode,
nl executors.NodeLookup, inputs *core.LiteralMap) (NodeStatus, error)
// This is the main entrypoint to execute a node. It recursively depth-first goes through all ready nodes and starts their execution
// This returns either
// - 1. It finds a blocking node (not ready, or running)
// - 2. A node fails and hence the workflow will fail
// - 3. The final/end node has completed and the workflow should be stopped
RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure,
nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (NodeStatus, error)
// This aborts the given node. If the given node is complete then it recursively finds the running nodes and aborts them
AbortHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure,
nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error
FinalizeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure,
nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) error
// This method should be used to initialize Node executor
Initialize(ctx context.Context) error
// GetNodeExecutionContextBuilder returns the current NodeExecutionContextBuilder
GetNodeExecutionContextBuilder() NodeExecutionContextBuilder
// WithNodeExecutionContextBuilder returns a new Node with the given NodeExecutionContextBuilder
WithNodeExecutionContextBuilder(NodeExecutionContextBuilder) Node
}
Core Node Executor that is used to execute a node. This is a recursive node executor and understands node dependencies
type NodeExecutionContext ¶
type NodeExecutionContext interface {
// This path is never read by propeller, but allows using some container or prefix in a specific container for all output from tasks
// Sandboxes provide exactly once execution semantics and only the successful sandbox wins. Ideally a sandbox should be a path that is
// available to the task at High Bandwidth (for example the base path of a sharded s3 bucket.
// This with a prefix based sharded strategy, could improve the throughput from S3 manifold)
RawOutputPrefix() storage.DataReference
// Sharding strategy for the output data for this node execution.
OutputShardSelector() ioutils.ShardSelector
DataStore() *storage.DataStore
InputReader() io.InputReader
EventsRecorder() EventRecorder
NodeID() v1alpha1.NodeID
Node() v1alpha1.ExecutableNode
CurrentAttempt() uint32
TaskReader() TaskReader
NodeStateReader() NodeStateReader
NodeStateWriter() NodeStateWriter
NodeExecutionMetadata() NodeExecutionMetadata
MaxDatasetSizeBytes() int64
EnqueueOwnerFunc() func() error
ContextualNodeLookup() executors.NodeLookup
ExecutionContext() executors.ExecutionContext
// TODO We should not need to pass NodeStatus, we probably only need it for DataDir, which should actually be sent using an OutputWriter interface
// Deprecated
NodeStatus() v1alpha1.ExecutableNodeStatus
}
type NodeExecutionContextBuilder ¶
type NodeExecutionContextBuilder interface {
BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext,
nl executors.NodeLookup, currentNodeID v1alpha1.NodeID) (NodeExecutionContext, error)
}
NodeExecutionContextBuilder defines how a NodeExecutionContext is built
type NodeExecutionMetadata ¶
type NodeExecutionMetadata interface {
GetOwnerID() types.NamespacedName
GetNodeExecutionID() *core.NodeExecutionIdentifier
GetNamespace() string
GetOwnerReference() v1.OwnerReference
GetLabels() map[string]string
GetAnnotations() map[string]string
GetK8sServiceAccount() string
GetSecurityContext() core.SecurityContext
IsInterruptible() bool
GetInterruptibleFailureThreshold() uint32
}
type NodeExecutor ¶
type NodeExecutor interface {
HandleNode(ctx context.Context, dag executors.DAGStructure, nCtx NodeExecutionContext, h NodeHandler) (NodeStatus, error)
Abort(ctx context.Context, h NodeHandler, nCtx NodeExecutionContext, reason string, finalTransition bool) error
Finalize(ctx context.Context, h NodeHandler, nCtx NodeExecutionContext) error
}
NodeExecutor defines the interface for handling a single Flyte Node of any Node type.
type NodeHandler ¶
type NodeHandler interface {
// Method to indicate that finalize is required for this handler
FinalizeRequired() bool
// Setup should be called, before invoking any other methods of this handler in a single thread context
Setup(ctx context.Context, setupContext SetupContext) error
// Core method that should handle this node
Handle(ctx context.Context, executionContext NodeExecutionContext) (handler.Transition, error)
// This method should be invoked to indicate the node needs to be aborted.
Abort(ctx context.Context, executionContext NodeExecutionContext, reason string) error
// This method is always called before completing the node, if FinalizeRequired returns true.
// It is guaranteed that Handle -> (happens before) -> Finalize. Abort -> finalize may be repeated multiple times
Finalize(ctx context.Context, executionContext NodeExecutionContext) error
}
Interface that should be implemented for a node type.
type NodePhase ¶
type NodePhase int
p of the node
const ( // Indicates that the node is not yet ready to be executed and is pending any previous nodes completion NodePhasePending NodePhase = iota // Indicates that the node was queued and will start running soon NodePhaseQueued // Indicates that the payload associated with this node is being executed and is not yet done NodePhaseRunning // Indicates that the nodes payload has been successfully completed, but any downstream nodes from this node may not yet have completed // We could make Success = running, but this enables more granular control NodePhaseSuccess // Complete indicates successful completion of a node. For singular nodes (nodes that have only one execution) success = complete, but, the executor // will always signal completion NodePhaseComplete // Node failed in execution, either this node or anything in the downstream chain NodePhaseFailed // Internal error observed. This state should always be accompanied with an `error`. if not the behavior is undefined NodePhaseUndefined // Finalize node failing due to timeout NodePhaseTimingOut // Node failed because execution timed out NodePhaseTimedOut // Node recovered from a prior execution. NodePhaseRecovered )
type NodeStateReader ¶
type NodeStateReader interface {
HasTaskNodeState() bool
GetTaskNodeState() handler.TaskNodeState
HasBranchNodeState() bool
GetBranchNodeState() handler.BranchNodeState
HasDynamicNodeState() bool
GetDynamicNodeState() handler.DynamicNodeState
HasWorkflowNodeState() bool
GetWorkflowNodeState() handler.WorkflowNodeState
HasGateNodeState() bool
GetGateNodeState() handler.GateNodeState
HasArrayNodeState() bool
GetArrayNodeState() handler.ArrayNodeState
}
type NodeStateWriter ¶
type NodeStateWriter interface {
PutTaskNodeState(s handler.TaskNodeState) error
PutBranchNode(s handler.BranchNodeState) error
PutDynamicNodeState(s handler.DynamicNodeState) error
PutWorkflowNodeState(s handler.WorkflowNodeState) error
PutGateNodeState(s handler.GateNodeState) error
PutArrayNodeState(s handler.ArrayNodeState) error
ClearNodeStatus()
}
type NodeStatus ¶
type NodeStatus struct {
NodePhase NodePhase
Err *core.ExecutionError
}
Helper struct to allow passing of status between functions
func NodeStatusFailed ¶
func NodeStatusFailed(err *core.ExecutionError) NodeStatus
func (*NodeStatus) HasFailed ¶
func (n *NodeStatus) HasFailed() bool
func (*NodeStatus) HasTimedOut ¶
func (n *NodeStatus) HasTimedOut() bool
func (*NodeStatus) IsComplete ¶
func (n *NodeStatus) IsComplete() bool
func (*NodeStatus) PartiallyComplete ¶
func (n *NodeStatus) PartiallyComplete() bool
type SetupContext ¶
type TaskReader ¶
type TaskReader interface {
Read(ctx context.Context) (*core.TaskTemplate, error)
GetTaskType() v1alpha1.TaskType
GetTaskID() *core.Identifier
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.