Documentation
¶
Index ¶
- Constants
- Variables
- func IsValidStep(t TaskType, s Step) bool
- func Step2Str(t TaskType, s Step) string
- func Type2Int(t TaskType) int
- type Allocatable
- type ExtraParams
- type ManagedNode
- type Modification
- type ModificationType
- type ModifyParam
- type NodeResource
- type Step
- type StepResource
- type Subtask
- type SubtaskBase
- type SubtaskState
- type Task
- type TaskBase
- type TaskState
- type TaskType
Constants ¶
const ( // TaskIDLabelName is the label name of task id. TaskIDLabelName = "task_id" // NormalPriority represents the normal priority of task. NormalPriority = 512 )
Variables ¶
var ( // EmptyMeta is the empty meta of task/subtask. EmptyMeta = []byte("{}") )
var MaxConcurrentTask = 16
MaxConcurrentTask is the max concurrency of task. TODO: remove this limit later.
var NodeResourceForTest = NewNodeResource(32, 32*units.GB, 100*units.GB)
NodeResourceForTest is only used for test.
Functions ¶
func IsValidStep ¶
IsValidStep returns whether the step is valid for the task type.
Types ¶
type Allocatable ¶
type Allocatable struct {
// contains filtered or unexported fields
}
Allocatable is a resource with capacity that can be allocated, it's routine safe.
func NewAllocatable ¶
func NewAllocatable(capacity int64) *Allocatable
NewAllocatable creates a new Allocatable.
func (*Allocatable) Alloc ¶
func (a *Allocatable) Alloc(n int64) bool
Alloc allocates v from the Allocatable.
func (*Allocatable) Capacity ¶
func (a *Allocatable) Capacity() int64
Capacity returns the capacity of the Allocatable.
func (*Allocatable) Used ¶
func (a *Allocatable) Used() int64
Used returns the used resource of the Allocatable.
type ExtraParams ¶
type ExtraParams struct {
// ManualRecovery indicates whether the task can be recovered manually.
// if enabled, the task will enter 'awaiting-resolution' state when it failed,
// then the user can recover the task manually or fail it if it's not recoverable.
ManualRecovery bool `json:"manual_recovery,omitempty"`
// MaxRuntimeSlots is the max slots when running subtasks of this task in
// TargetSteps steps.
// normally it's 0, means we will use the RequiredSlots to run the subtasks.
// if set, we will use the min of RequiredSlots and MaxRuntimeSlots as the
// execution effective slots of the task step defined in TargetSteps.
// this field is used to workaround OOM issue where TiDB might repeatedly
// restart. the DXF framework won't detect changes in this field, so it's not
// part of normal schedule workflow, when TiDB restarts the newest value will
// be used.
// RequiredSlots might be modified, and MaxRuntimeSlots is not touched in this
// case due to above reason, so MaxRuntimeSlots might > RequiredSlots.
MaxRuntimeSlots int `json:"max_runtime_slots,omitempty"`
// TargetSteps indicates the steps that MaxRuntimeSlots takes effect.
// if empty or nil, MaxRuntimeSlots takes effect in all steps.
// normally OOM only happens in some specific steps, so we can just limit the
// concurrency in those steps to reduce the impact on the overall performance.
TargetSteps []Step `json:"target_steps,omitempty"`
}
ExtraParams is the extra params of task. Note: only store params that's not used for filter or sort in this struct.
type ManagedNode ¶
type ManagedNode struct {
// ID see GenerateExecID, it's named as host in the meta table.
ID string
// Role of the node, either "" or "background"
// all managed node should have the same role
Role string
CPUCount int
}
ManagedNode is a TiDB node that is managed by the framework.
type Modification ¶
type Modification struct {
Type ModificationType `json:"type"`
To int64 `json:"to"`
}
Modification is one modification for task.
func (Modification) String ¶
func (m Modification) String() string
String implements fmt.Stringer interface.
type ModificationType ¶
type ModificationType string
ModificationType is the type of task modification.
const ( // ModifyRequiredSlots is the type for modifying task required slots. // Note: required slots is introduced later and separated from the old // "concurrency" concept, we still use "modify_concurrency" as the modification // type for compatibility. ModifyRequiredSlots ModificationType = "modify_concurrency" // ModifyMaxNodeCount is the type for modifying max node count of task. ModifyMaxNodeCount ModificationType = "modify_max_node_count" // ModifyBatchSize is the type for modifying batch size of add-index. ModifyBatchSize ModificationType = "modify_batch_size" // ModifyMaxWriteSpeed is the type for modifying max write speed of add-index. ModifyMaxWriteSpeed ModificationType = "modify_max_write_speed" )
func (ModificationType) String ¶
func (t ModificationType) String() string
String implements fmt.Stringer interface.
type ModifyParam ¶
type ModifyParam struct {
PrevState TaskState `json:"prev_state"`
Modifications []Modification `json:"modifications"`
}
ModifyParam is the parameter for task modification.
func (*ModifyParam) String ¶
func (p *ModifyParam) String() string
String implements fmt.Stringer interface.
type NodeResource ¶
NodeResource is the resource of the node. exported for test.
func NewNodeResource ¶
func NewNodeResource(totalCPU int, totalMem int64, totalDisk uint64) *NodeResource
NewNodeResource creates a new NodeResource.
func (*NodeResource) GetStepResource ¶
func (nr *NodeResource) GetStepResource(task *TaskBase) *StepResource
GetStepResource gets the step resource according to slots.
func (*NodeResource) GetTaskDiskResource ¶
func (nr *NodeResource) GetTaskDiskResource(task *TaskBase, quotaHint uint64) uint64
GetTaskDiskResource gets available disk for a task.
type Step ¶
type Step int64
Step is the step of task.
TaskStep is the step of task. DO NOT change the value of the constants, will break backward compatibility. successfully task MUST go from StepInit to business steps, then StepDone.
const ( // ImportStepImport we sort source data and ingest it into TiKV in this step. ImportStepImport Step = 1 // ImportStepPostProcess we verify checksum and add index in this step. ImportStepPostProcess Step = 2 // ImportStepEncodeAndSort encode source data and write sorted kv into global storage. ImportStepEncodeAndSort Step = 3 // ImportStepMergeSort merge sorted kv from global storage, so we can have better // read performance during ImportStepWriteAndIngest. // depends on how much kv files are overlapped, there's might 0 subtasks // in this step. ImportStepMergeSort Step = 4 // ImportStepWriteAndIngest write sorted kv into TiKV and ingest it. ImportStepWriteAndIngest Step = 5 // ImportStepCollectConflicts collect conflicts info, this step won't mutate // downstream data, so is idempotent, and we can collect a correct checksum // for the conflicted rows. if we do this together with ImportStepConflictResolution, // once the step retry in the middle, we can't get a correct checksum. // this step also need to do deduplication for the conflicted rows due to // multiple unique indexes to avoid repeated collection, currently, we do it // in memory, so if there are too many conflicts, we will skip the later // checksum step as we don't know the exact checksum. ImportStepCollectConflicts Step = 6 // ImportStepConflictResolution resolve detected conflicts. // during other steps of global sort, we will detect conflicts and record them // in external storage, if any conflicts are detected, we will resolve them // here. so there might be 0 subtasks in this step. ImportStepConflictResolution Step = 7 )
Steps of IMPORT INTO, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order:
- local sort: StepInit -> ImportStepImport -> ImportStepPostProcess -> StepDone
- global sort: StepInit -> ImportStepEncodeAndSort -> ImportStepMergeSort (optional) -> ImportStepWriteAndIngest -> ImportStepCollectConflicts (optional) -> ImportStepConflictResolution (optional) -> ImportStepPostProcess -> StepDone
const ( BackfillStepReadIndex Step = 1 // BackfillStepMergeSort only used in global sort, it will merge sorted kv from global storage, so we can have better // read performance during BackfillStepWriteAndIngest with global sort. // depends on how much kv files are overlapped. // When kv files overlapped less than MergeSortOverlapThreshold, there‘re no subtasks. BackfillStepMergeSort Step = 2 // BackfillStepWriteAndIngest write sorted kv into TiKV and ingest it. BackfillStepWriteAndIngest Step = 3 // BackfillStepMergeTempIndex is the step to merge temp index into the original index. BackfillStepMergeTempIndex Step = 4 )
Steps of Add Index, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order: - local sort: StepInit -> BackfillStepReadIndex -> StepDone - global sort: StepInit -> BackfillStepReadIndex -> BackfillStepMergeSort -> BackfillStepWriteAndIngest -> StepDone
type StepResource ¶
type StepResource struct {
CPU *Allocatable
Mem *Allocatable
}
StepResource is the max resource that a task step can use. it's also the max resource that a subtask can use, as we run subtasks of task step in sequence.
func (*StepResource) String ¶
func (s *StepResource) String() string
String implements Stringer interface.
type Subtask ¶
type Subtask struct {
SubtaskBase
// UpdateTime is the time when the subtask is updated.
// it can be used as subtask end time if the subtask is finished.
// it's 0 if it hasn't started yet.
UpdateTime time.Time
// Meta is the metadata of subtask, should not be nil.
// meta of different subtasks of same step must be different too.
// NOTE: this field can be changed by StepExecutor.OnFinished method, to store
// some result, and framework will update the subtask meta in the storage.
// On other code path, this field should be read-only.
Meta []byte
Summary string
}
Subtask represents the subtask of distribute framework. subtasks of a task are run in parallel on different nodes, but on each node, at most 1 subtask can be run at the same time, see StepExecutor too.
type SubtaskBase ¶
type SubtaskBase struct {
ID int64
Step Step
Type TaskType
// taken from task_key of the subtask table
TaskID int64
State SubtaskState
// Concurrency is the concurrency of the subtask.
// it's initialized as the task's required slots, and it's NOT used now.
// if the required slot of task is modified, the concurrency of un-finished
// subtasks of the task will be updated too.
// some subtasks like post-process of import into, don't consume too many resources,
// can lower this value, can use this field to implement such feature later.
Concurrency int
// ExecID is the ID of target executor, right now it's the same as instance_id,
// its value is IP:PORT, see GenerateExecID
ExecID string
CreateTime time.Time
// StartTime is the time when the subtask is started.
// it's 0 if it hasn't started yet.
StartTime time.Time
// Ordinal is the ordinal of subtask, should be unique for some task and step.
// starts from 1.
Ordinal int
}
SubtaskBase contains the basic information of a subtask. we define this to avoid load subtask meta which might be very large into memory.
func (*SubtaskBase) IsDone ¶
func (t *SubtaskBase) IsDone() bool
IsDone checks if the subtask is done.
func (*SubtaskBase) String ¶
func (t *SubtaskBase) String() string
type SubtaskState ¶
type SubtaskState string
SubtaskState is the state of subtask.
const ( SubtaskStatePending SubtaskState = "pending" SubtaskStateRunning SubtaskState = "running" SubtaskStateSucceed SubtaskState = "succeed" SubtaskStateFailed SubtaskState = "failed" SubtaskStateCanceled SubtaskState = "canceled" SubtaskStatePaused SubtaskState = "paused" )
see doc.go for more details.
func (SubtaskState) String ¶
func (s SubtaskState) String() string
type Task ¶
type Task struct {
TaskBase
// SchedulerID is not used now.
SchedulerID string
StartTime time.Time
StateUpdateTime time.Time
// Meta is the metadata of task, it's read-only in most cases, but it can be
// changed in below case, and framework will update the task meta in the storage.
// - task switches to next step in Scheduler.OnNextSubtasksBatch
// - on task cleanup, we might do some redaction on the meta.
// - on task 'modifying', params inside the meta can be changed.
Meta []byte
Error error
ModifyParam ModifyParam
}
Task represents the task of distributed framework, see doc.go for more details.
type TaskBase ¶
type TaskBase struct {
ID int64
Key string
Type TaskType
State TaskState
Step Step
// Priority is the priority of task, the smaller value means the higher priority.
// valid range is [1, 1024], default is NormalPriority.
Priority int
// RequiredSlots is the required slots of the task.
// we use this field to allocate slots when scheduling and creating the task
// executor, but the effective slots when running the task is determined by
// GetRuntimeSlots.
// in normal case, they are the same. but when meeting OOM and TiDB repeatedly
// restarts, we might set a lower MaxRuntimeSlots in ExtraParams, then the
// effective slots is smaller than RequiredSlots.
// Note: in application layer, don't use this field directly, use GetRuntimeSlots
// or GetResource of step executor instead.
// Note: in the system table, we store it inside 'concurrency' column as
// required slots is introduced later.
RequiredSlots int
// TargetScope indicates that the task should be running on tidb nodes which
// contain the tidb_service_scope=TargetScope label.
// To be compatible with previous version, if it's "" or "background", the
// task try run on nodes of "background" scope,
// if there is no such nodes, will try nodes of "" scope.
TargetScope string
CreateTime time.Time
MaxNodeCount int
ExtraParams ExtraParams
// keyspace name is the keyspace that the task belongs to.
// it's only useful for nextgen cluster.
Keyspace string
}
TaskBase contains the basic information of a task. we define this to avoid load task meta which might be very large into memory.
func (*TaskBase) Compare ¶
Compare compares two tasks by task rank. returns < 0 represents rank of t is higher than 'other'.
func (*TaskBase) CompareTask ¶
CompareTask a wrapper of Compare.
func (*TaskBase) GetRuntimeSlots ¶
GetRuntimeSlots gets the runtime slots of current task step. application layer might use this as the concurrency of the task step.
type TaskState ¶
type TaskState string
TaskState is the state of task.
const ( TaskStatePending TaskState = "pending" TaskStateRunning TaskState = "running" TaskStateSucceed TaskState = "succeed" TaskStateFailed TaskState = "failed" TaskStateReverting TaskState = "reverting" TaskStateAwaitingResolution TaskState = "awaiting-resolution" TaskStateReverted TaskState = "reverted" TaskStateCancelling TaskState = "cancelling" TaskStatePausing TaskState = "pausing" TaskStatePaused TaskState = "paused" TaskStateResuming TaskState = "resuming" TaskStateModifying TaskState = "modifying" )
see doc.go for more details.
func (TaskState) CanMoveToModifying ¶
CanMoveToModifying checks if current state can move to 'modifying' state.