execute

package
v1.1.0-beta.0...-cb5f862 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// UpdateSubtaskSummaryInterval is the interval for updating the subtask summary to
	// subtask table.
	UpdateSubtaskSummaryInterval = 3 * time.Second

	// SubtaskSpeedUpdateInterval is the interval for updating the subtasks' speed.
	SubtaskSpeedUpdateInterval = UpdateSubtaskSummaryInterval * maxProgressInSummary
)

Variables

This section is empty.

Functions

func SetFrameworkInfo

func SetFrameworkInfo(
	exec StepExecutor,
	task *proto.Task,
	resource *proto.StepResource,
	updateCheckpointFunc func(context.Context, int64, any) error,
	getCheckpointFunc func(context.Context, int64) (string, error),
)

SetFrameworkInfo sets the framework info for the StepExecutor.

Types

type Collector

type Collector interface {
	// Accepted is used collects metrics.
	// The difference between Accepted and Processed is that Accepted is called
	// when the data is accepted to be processed.
	Accepted(bytes int64)
	// Processed is used collects metrics.
	// `bytes` is the number of bytes processed, and `rows` is the number of rows processed.
	// The meaning of `bytes` may vary by scenario, for example:
	//   - During encoding, it represents the number of bytes read from the source data file.
	//   - During merge sort, it represents the number of bytes merged.
	Processed(bytes, rows int64)
}

Collector is the interface for collecting subtask metrics.

type NoopCollector

type NoopCollector struct{}

NoopCollector is a no-op implementation of Collector.

func (*NoopCollector) Accepted

func (*NoopCollector) Accepted(_ int64)

Accepted implements Collector.Accepted

func (*NoopCollector) Processed

func (*NoopCollector) Processed(_, _ int64)

Processed implements Collector.Processed

type Progress

type Progress struct {
	// For now, RowCnt is not used, but as it's collected by the collector,
	// we still keep it here for future possible usage.
	RowCnt int64 `json:"row_count,omitempty"`
	Bytes  int64 `json:"bytes,omitempty"`

	// UpdateTime is the time when this progress is stored.
	UpdateTime time.Time `json:"update_time,omitempty"`
}

Progress represents the progress of a subtask at a specific time.

type StepExecFrameworkInfo

type StepExecFrameworkInfo interface {

	// GetStep returns the step.
	GetStep() proto.Step
	// GetResource returns the expected resource of this step executor.
	GetResource() *proto.StepResource
	// SetResource sets the resource of this step executor.
	SetResource(resource *proto.StepResource)
	// GetMeterRecorder returns the meter recorder for the corresponding task.
	GetMeterRecorder() *metering.Recorder
	// GetCheckpointUpdateFunc returns the checkpoint update function
	GetCheckpointUpdateFunc() func(context.Context, int64, any) error
	// GetCheckpointunc returns the checkpoint get function
	GetCheckpointFunc() func(context.Context, int64) (string, error)
	// contains filtered or unexported methods
}

StepExecFrameworkInfo is an interface that should be embedded into the implementation of StepExecutor. It's set by the framework automatically and the implementation can use it to access necessary information. The framework will init it before `StepExecutor.Init`, before that you cannot call methods in this interface.

type StepExecutor

type StepExecutor interface {
	StepExecFrameworkInfo

	// Init is used to initialize the environment.
	// task executor will retry if the returned error is retryable, see
	// IsRetryableError in TaskExecutor.Extension, else framework will mark random
	// subtask as failed, to trigger task failure.
	Init(context.Context) error
	// RunSubtask is used to run the subtask.
	// The subtask meta can be updated in place, if no error returned, the subtask
	// meta will be updated in the task table.
	RunSubtask(ctx context.Context, subtask *proto.Subtask) error

	// RealtimeSummary returns the realtime summary of the running subtask by this executor.
	RealtimeSummary() *SubtaskSummary

	// ResetSummary resets the summary of the running subtask by this executor.
	ResetSummary()

	// Cleanup is used to clean up the environment for this step.
	// the returned error will not affect task/subtask state, it's only logged,
	// so don't put code that's prone to error in it.
	Cleanup(context.Context) error
	// TaskMetaModified is called when the task meta is modified, if any error
	// happen, framework might recreate the step executor, so don't put code
	// that's prone to error in it.
	TaskMetaModified(ctx context.Context, newMeta []byte) error
	// ResourceModified is called when the resource allowed to be used is modified
	// and there is a subtask running. Note: if no subtask running, framework will
	// call SetResource directly.
	// application must make sure the resource in use conforms to the new resource
	// before returning. When reducing resources, the framework depends on this
	// to make sure current instance won't OOM.
	ResourceModified(ctx context.Context, newResource *proto.StepResource) error
}

StepExecutor defines the executor for subtasks of a task step. the calling sequence is:

Init
for every subtask of this step:
	if RunSubtask failed then break
	else OnFinished
Cleanup

type SubtaskSummary

type SubtaskSummary struct {
	// RowCnt and Bytes are updated by the collector.
	RowCnt atomic.Int64 `json:"row_count,omitempty"`
	// Bytes is the number of bytes to process.
	Bytes atomic.Int64 `json:"bytes,omitempty"`
	// ReadBytes is the number of bytes that read from the source.
	ReadBytes atomic.Int64 `json:"read_bytes,omitempty"`
	// GetReqCnt is the number of get requests to the external storage.
	// Note: Import-into also do GET on the source data bucket, but that's not
	// recorded.
	GetReqCnt atomic.Uint64 `json:"get_request_count,omitempty"`
	// PutReqCnt is the number of put requests to the external storage.
	PutReqCnt atomic.Uint64 `json:"put_request_count,omitempty"`

	// Progresses are the history of data processed, which is used to get a
	// smoother speed for each subtask.
	// It's updated each time we store the latest summary into subtask table.
	Progresses []Progress `json:"progresses,omitempty"`
}

SubtaskSummary contains the summary of a subtask. It tracks the runtime summary of the subtask.

func (*SubtaskSummary) GetSpeedInTimeRange

func (s *SubtaskSummary) GetSpeedInTimeRange(endTime time.Time, duration time.Duration) int64

GetSpeedInTimeRange returns the speed in the specified time range.

func (*SubtaskSummary) MergeObjStoreRequests

func (s *SubtaskSummary) MergeObjStoreRequests(reqs *recording.Requests)

MergeObjStoreRequests merges the recording requests into the summary.

func (*SubtaskSummary) Reset

func (s *SubtaskSummary) Reset()

Reset resets the summary to zero values and clears history data.

func (*SubtaskSummary) Update

func (s *SubtaskSummary) Update()

Update stores the latest progress of the subtask.

func (*SubtaskSummary) UpdateTime

func (s *SubtaskSummary) UpdateTime() time.Time

UpdateTime returns the last update time of the summary.

type TestCollector

type TestCollector struct {
	NoopCollector
	ReadBytes atomic.Int64
	Bytes     atomic.Int64
	Rows      atomic.Int64
}

TestCollector is an implementation used for test.

func (*TestCollector) Accepted

func (c *TestCollector) Accepted(bytes int64)

Accepted implements Collector.Accepted

func (*TestCollector) Processed

func (c *TestCollector) Processed(bytes, rows int64)

Processed implements Collector.Processed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL