Documentation
¶
Index ¶
Constants ¶
const ( // UpdateSubtaskSummaryInterval is the interval for updating the subtask summary to // subtask table. UpdateSubtaskSummaryInterval = 3 * time.Second // SubtaskSpeedUpdateInterval is the interval for updating the subtasks' speed. SubtaskSpeedUpdateInterval = UpdateSubtaskSummaryInterval * maxProgressInSummary )
Variables ¶
This section is empty.
Functions ¶
func SetFrameworkInfo ¶
func SetFrameworkInfo( exec StepExecutor, task *proto.Task, resource *proto.StepResource, updateCheckpointFunc func(context.Context, int64, any) error, getCheckpointFunc func(context.Context, int64) (string, error), )
SetFrameworkInfo sets the framework info for the StepExecutor.
Types ¶
type Collector ¶
type Collector interface {
// Accepted is used collects metrics.
// The difference between Accepted and Processed is that Accepted is called
// when the data is accepted to be processed.
Accepted(bytes int64)
// Processed is used collects metrics.
// `bytes` is the number of bytes processed, and `rows` is the number of rows processed.
// The meaning of `bytes` may vary by scenario, for example:
// - During encoding, it represents the number of bytes read from the source data file.
// - During merge sort, it represents the number of bytes merged.
Processed(bytes, rows int64)
}
Collector is the interface for collecting subtask metrics.
type NoopCollector ¶
type NoopCollector struct{}
NoopCollector is a no-op implementation of Collector.
func (*NoopCollector) Accepted ¶
func (*NoopCollector) Accepted(_ int64)
Accepted implements Collector.Accepted
func (*NoopCollector) Processed ¶
func (*NoopCollector) Processed(_, _ int64)
Processed implements Collector.Processed
type Progress ¶
type Progress struct {
// For now, RowCnt is not used, but as it's collected by the collector,
// we still keep it here for future possible usage.
RowCnt int64 `json:"row_count,omitempty"`
Bytes int64 `json:"bytes,omitempty"`
// UpdateTime is the time when this progress is stored.
UpdateTime time.Time `json:"update_time,omitempty"`
}
Progress represents the progress of a subtask at a specific time.
type StepExecFrameworkInfo ¶
type StepExecFrameworkInfo interface {
// GetStep returns the step.
GetStep() proto.Step
// GetResource returns the expected resource of this step executor.
GetResource() *proto.StepResource
// SetResource sets the resource of this step executor.
SetResource(resource *proto.StepResource)
// GetMeterRecorder returns the meter recorder for the corresponding task.
GetMeterRecorder() *metering.Recorder
// GetCheckpointUpdateFunc returns the checkpoint update function
GetCheckpointUpdateFunc() func(context.Context, int64, any) error
// GetCheckpointunc returns the checkpoint get function
GetCheckpointFunc() func(context.Context, int64) (string, error)
// contains filtered or unexported methods
}
StepExecFrameworkInfo is an interface that should be embedded into the implementation of StepExecutor. It's set by the framework automatically and the implementation can use it to access necessary information. The framework will init it before `StepExecutor.Init`, before that you cannot call methods in this interface.
type StepExecutor ¶
type StepExecutor interface {
StepExecFrameworkInfo
// Init is used to initialize the environment.
// task executor will retry if the returned error is retryable, see
// IsRetryableError in TaskExecutor.Extension, else framework will mark random
// subtask as failed, to trigger task failure.
Init(context.Context) error
// RunSubtask is used to run the subtask.
// The subtask meta can be updated in place, if no error returned, the subtask
// meta will be updated in the task table.
RunSubtask(ctx context.Context, subtask *proto.Subtask) error
// RealtimeSummary returns the realtime summary of the running subtask by this executor.
RealtimeSummary() *SubtaskSummary
// ResetSummary resets the summary of the running subtask by this executor.
ResetSummary()
// Cleanup is used to clean up the environment for this step.
// the returned error will not affect task/subtask state, it's only logged,
// so don't put code that's prone to error in it.
Cleanup(context.Context) error
// TaskMetaModified is called when the task meta is modified, if any error
// happen, framework might recreate the step executor, so don't put code
// that's prone to error in it.
TaskMetaModified(ctx context.Context, newMeta []byte) error
// ResourceModified is called when the resource allowed to be used is modified
// and there is a subtask running. Note: if no subtask running, framework will
// call SetResource directly.
// application must make sure the resource in use conforms to the new resource
// before returning. When reducing resources, the framework depends on this
// to make sure current instance won't OOM.
ResourceModified(ctx context.Context, newResource *proto.StepResource) error
}
StepExecutor defines the executor for subtasks of a task step. the calling sequence is:
Init for every subtask of this step: if RunSubtask failed then break else OnFinished Cleanup
type SubtaskSummary ¶
type SubtaskSummary struct {
// RowCnt and Bytes are updated by the collector.
RowCnt atomic.Int64 `json:"row_count,omitempty"`
// Bytes is the number of bytes to process.
Bytes atomic.Int64 `json:"bytes,omitempty"`
// ReadBytes is the number of bytes that read from the source.
ReadBytes atomic.Int64 `json:"read_bytes,omitempty"`
// GetReqCnt is the number of get requests to the external storage.
// Note: Import-into also do GET on the source data bucket, but that's not
// recorded.
GetReqCnt atomic.Uint64 `json:"get_request_count,omitempty"`
// PutReqCnt is the number of put requests to the external storage.
PutReqCnt atomic.Uint64 `json:"put_request_count,omitempty"`
// Progresses are the history of data processed, which is used to get a
// smoother speed for each subtask.
// It's updated each time we store the latest summary into subtask table.
Progresses []Progress `json:"progresses,omitempty"`
}
SubtaskSummary contains the summary of a subtask. It tracks the runtime summary of the subtask.
func (*SubtaskSummary) GetSpeedInTimeRange ¶
GetSpeedInTimeRange returns the speed in the specified time range.
func (*SubtaskSummary) MergeObjStoreRequests ¶
func (s *SubtaskSummary) MergeObjStoreRequests(reqs *recording.Requests)
MergeObjStoreRequests merges the recording requests into the summary.
func (*SubtaskSummary) Reset ¶
func (s *SubtaskSummary) Reset()
Reset resets the summary to zero values and clears history data.
func (*SubtaskSummary) Update ¶
func (s *SubtaskSummary) Update()
Update stores the latest progress of the subtask.
func (*SubtaskSummary) UpdateTime ¶
func (s *SubtaskSummary) UpdateTime() time.Time
UpdateTime returns the last update time of the summary.
type TestCollector ¶
type TestCollector struct {
NoopCollector
ReadBytes atomic.Int64
Bytes atomic.Int64
Rows atomic.Int64
}
TestCollector is an implementation used for test.
func (*TestCollector) Accepted ¶
func (c *TestCollector) Accepted(bytes int64)
Accepted implements Collector.Accepted
func (*TestCollector) Processed ¶
func (c *TestCollector) Processed(bytes, rows int64)
Processed implements Collector.Processed