Documentation
¶
Overview ¶
Package workflow defines how to represent physical plans as distributed workflows.
Index ¶
- Variables
- func Fprint(w io.Writer, wf *Workflow) error
- func Sprint(wf *Workflow) string
- type ContributingTimeRange
- type Manifest
- type Options
- type RecordWriter
- type Runner
- type Stream
- type StreamEventHandler
- type StreamState
- type Task
- type TaskEventHandler
- type TaskState
- type TaskStatus
- type Workflow
Constants ¶
This section is empty.
Variables ¶
var TaskStates = [...]string{
"Created",
"Pending",
"Running",
"Completed",
"Cancelled",
"Failed",
}
Functions ¶
Types ¶
type ContributingTimeRange ¶
type ContributingTimeRange struct {
// End of the range
Timestamp time.Time
// Less than Timestamp
LessThan bool
}
ContributingTimeRange represents a time range of input data that can change the current state of a running task. Anything outside of this range can not meaningfully contribute to the task state.
type Manifest ¶
type Manifest struct {
ID ulid.ULID // ID of the manifest.
Tenant string // Tenant that this manifest is associated with, if any.
Actor []string // Path to the actor that generated this manifest.
// Streams are the collection of streams within a manifest.
Streams []*Stream
// Tasks are the collection of Tasks within a manifest. Tasks only reference
// Streams within the same manifest.
Tasks []*Task
StreamEventHandler StreamEventHandler // Handler for stream events.
TaskEventHandler TaskEventHandler // Handler for task events.
}
A Manifest is a collection of related Tasks and Streams. A manifest is given to a Runner before tasks can run.
type Options ¶
type Options struct {
Tenant string // Tenant ID associated with the workflow.
Actor []string // Optional path to the actor that is generating the workflow.
// MaxRunningScanTasks specifies the maximum number of scan tasks that may
// run concurrently within a single workflow. 0 means no limit.
MaxRunningScanTasks int
// MaxRunningOtherTasks specifies the maximum number of non-scan tasks that
// may run concurrently within a single workflow. 0 means no limit.
MaxRunningOtherTasks int
// DebugTasks toggles debug messages for a task. This is very verbose and
// should only be enabled for debugging purposes.
//
// Regardless of the value of DebugTasks, workers still log when
// they start and finish assigned tasks.
DebugTasks bool
// DebugStreams toggles debug messages for data streams. This is very
// verbose and should only be enabled for debugging purposes.
DebugStreams bool
}
Options configures a Workflow.
type RecordWriter ¶
type RecordWriter interface {
// Write writes a new record to the stream.
Write(ctx context.Context, record arrow.RecordBatch) error
}
A RecordWriter is used to write records to a stream.
type Runner ¶
type Runner interface {
// RegisterManifest registers a Manifest to use with the runner. Registering
// a manifest records all of the streams and tasks inside of it for use.
//
// Tasks within the manifest must only reference streams within the same
// manifest.
//
// RegisterManifest returns an error if a stream or task is already
// associated with a different manifest.
//
// The handlers defined by the manifest will be invoked whenever the
// corresponding resources of the manifest change state.
RegisterManifest(ctx context.Context, manifest *Manifest) error
// UnregisterManifest unregisters a Manifest from the runner. Unregistering
// a manifest forcibly cancels any tasks associated with it.
//
// UnregisterManifest returns an error if the manifest is not registered or
// if it contains unregistered streams or tasks.
UnregisterManifest(ctx context.Context, manifest *Manifest) error
// Listen binds the stream to write to the provided writer. Listen returns
// an error if the stream was not defined in a registered manifest, or if a
// task is already bound to the stream as a reader.
Listen(ctx context.Context, writer RecordWriter, stream *Stream) error
// Start begins executing the provided tasks in the background. Start
// returns an error if any of the Tasks were not registered through a
// manifest.
//
// The provided context is used for the lifetime of the Start call. To
// cancel started tasks, use [Runner.Cancel].
Start(ctx context.Context, tasks ...*Task) error
// Cancel requests cancellation of the specified tasks. Cancel returns an
// error if any of the tasks were not found.
Cancel(ctx context.Context, tasks ...*Task) error
}
A Runner can asynchronously execute a workflow.
type Stream ¶
type Stream struct {
// ULID is a unique identifier of the Stream.
ULID ulid.ULID
// TenantID is a tenant associated with this stream.
TenantID string
}
A Stream is an abstract representation of how data flows across Task boundaries. Each Stream has exactly one sender (a Task), and one receiver (either another Task or the owning Workflow).
type StreamEventHandler ¶
type StreamEventHandler func(ctx context.Context, s *Stream, newState StreamState)
StreamEventHandler is a function that handles events for changed streams.
type StreamState ¶
type StreamState int
StreamState represents the state of a stream. It is sent as an event by a Runner whenever a stream associated with a task changes its state.
The zero value of StreamState is an inactive stream.
const ( // StreamStateIdle represents a stream that is waiting for both the sender // and receiver to be available. StreamStateIdle StreamState = iota // StreamStateOpen represents a stream that is open and transmitting data. StreamStateOpen // StreamStateBlocked represents a stream that is blocked (by backpressure) // on sending data. StreamStateBlocked // StreamStateClosed represents a stream that is closed and no longer // transmitting data. StreamStateClosed )
func (StreamState) String ¶
func (s StreamState) String() string
String returns a string representation of the StreamState.
type Task ¶
type Task struct {
// ULID is a unique identifier of the Task.
ULID ulid.ULID
// TenantID is a tenant associated with this task.
TenantID string
// Fragment is the local physical plan that this Task represents.
Fragment *physical.Plan
// Sources defines which Streams physical nodes read from. Sources are only
// defined for nodes in the Fragment which read data across task boundaries.
Sources map[physical.Node][]*Stream
// Sinks defines which Streams physical nodes write to. Sinks are only
// defined for nodes in the Fragment which write data across task boundaries.
Sinks map[physical.Node][]*Stream
// The maximum boundary of timestamps that the task can possibly emit.
// Does not account for predicates.
// MaxTimeRange is not read when executing a task fragment. It can be used
// as metadata to control execution (such as cancelling ongoing tasks based
// on their maximum time range).
MaxTimeRange physical.TimeRange
}
A Task is a single unit of work within a workflow. Each Task is a partition of a local physical plan.
type TaskEventHandler ¶
type TaskEventHandler func(ctx context.Context, t *Task, newStatus TaskStatus)
TaskEventHandler is a function that handles events for changed tasks.
type TaskState ¶
type TaskState int
TaskState represents the state of a Task. It is sent as an event by a Runner whenever a Task associated with a task changes its state.
const ( // TaskStateCreated represents the initial state for a Task, where it has // been created but not given to a [Runner]. TaskStateCreated TaskState = iota // TaskStatePending represents a Task that is pending execution by a // [Runner]. TaskStatePending // TaskStateRunning represents a Task that is currently being executed by a // [Runner]. TaskStateRunning // TaskStateCompleted represents a Task that has completed successfully. TaskStateCompleted // TaskStateCancelled represents a Task that has been cancelled, either by a // [Runner] or the owning [Workflow]. TaskStateCancelled // TaskStateFailed represents a Task that has failed during execution. TaskStateFailed )
type TaskStatus ¶
type TaskStatus struct {
State TaskState
// Error holds the error that occurred during task execution, if any. Only
// set when State is [TaskStateFailed].
Error error
// Capture contains observations about the execution of the task.
Capture *xcap.Capture
// Statistics report analytics about the lifetime of a task. Only set
// for terminal task states (see [TaskState.Terminal]).
Statistics *stats.Result
// ContributingTimeRange of a running task. Only set for non-terminal states.
ContributingTimeRange ContributingTimeRange
}
TaskStatus holds the state of a task and additional information about that state.
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
Workflow represents a physical plan that has been partitioned into parallelizable tasks.
func New ¶
New creates a new Workflow from a physical plan. New returns an error if the physical plan does not have exactly one root node, or if the physical plan cannot be partitioned into a Workflow.
The provided Runner will be used for Workflow execution.
func (*Workflow) Close ¶
func (wf *Workflow) Close()
Close releases resources associated with the workflow.