proto

package
v1.1.0-beta.0...-007466b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TaskIDLabelName is the label name of task id.
	TaskIDLabelName = "task_id"
	// NormalPriority represents the normal priority of task.
	NormalPriority = 512
)

Variables

View Source
var (
	// EmptyMeta is the empty meta of task/subtask.
	EmptyMeta = []byte("{}")
)
View Source
var MaxConcurrentTask = 16

MaxConcurrentTask is the max concurrency of task. TODO: remove this limit later.

View Source
var NodeResourceForTest = NewNodeResource(32, 32*units.GB, 100*units.GB)

NodeResourceForTest is only used for test.

Functions

func IsValidStep

func IsValidStep(t TaskType, s Step) bool

IsValidStep returns whether the step is valid for the task type.

func Step2Str

func Step2Str(t TaskType, s Step) string

Step2Str converts step to string. it's too bad that we define step as int 🙃.

func Type2Int

func Type2Int(t TaskType) int

Type2Int converts task type to int.

Types

type Allocatable

type Allocatable struct {
	// contains filtered or unexported fields
}

Allocatable is a resource with capacity that can be allocated, it's routine safe.

func NewAllocatable

func NewAllocatable(capacity int64) *Allocatable

NewAllocatable creates a new Allocatable.

func (*Allocatable) Alloc

func (a *Allocatable) Alloc(n int64) bool

Alloc allocates v from the Allocatable.

func (*Allocatable) Capacity

func (a *Allocatable) Capacity() int64

Capacity returns the capacity of the Allocatable.

func (*Allocatable) Free

func (a *Allocatable) Free(n int64)

Free frees v from the Allocatable.

func (*Allocatable) Used

func (a *Allocatable) Used() int64

Used returns the used resource of the Allocatable.

type ExtraParams

type ExtraParams struct {
	// ManualRecovery indicates whether the task can be recovered manually.
	// if enabled, the task will enter 'awaiting-resolution' state when it failed,
	// then the user can recover the task manually or fail it if it's not recoverable.
	ManualRecovery bool `json:"manual_recovery,omitempty"`
	// MaxRuntimeSlots is the max slots when running subtasks of this task in
	// TargetSteps steps.
	// normally it's 0, means we will use the RequiredSlots to run the subtasks.
	// if set, we will use the min of RequiredSlots and MaxRuntimeSlots as the
	// execution effective slots of the task step defined in TargetSteps.
	// this field is used to workaround OOM issue where TiDB might repeatedly
	// restart. the DXF framework won't detect changes in this field, so it's not
	// part of normal schedule workflow, when TiDB restarts the newest value will
	// be used.
	// RequiredSlots might be modified, and MaxRuntimeSlots is not touched in this
	// case due to above reason, so MaxRuntimeSlots might > RequiredSlots.
	MaxRuntimeSlots int `json:"max_runtime_slots,omitempty"`
	// TargetSteps indicates the steps that MaxRuntimeSlots takes effect.
	// if empty or nil, MaxRuntimeSlots takes effect in all steps.
	// normally OOM only happens in some specific steps, so we can just limit the
	// concurrency in those steps to reduce the impact on the overall performance.
	TargetSteps []Step `json:"target_steps,omitempty"`
}

ExtraParams is the extra params of task. Note: only store params that's not used for filter or sort in this struct.

type ManagedNode

type ManagedNode struct {
	// ID see GenerateExecID, it's named as host in the meta table.
	ID string
	// Role of the node, either "" or "background"
	// all managed node should have the same role
	Role     string
	CPUCount int
}

ManagedNode is a TiDB node that is managed by the framework.

type Modification

type Modification struct {
	Type ModificationType `json:"type"`
	To   int64            `json:"to"`
}

Modification is one modification for task.

func (Modification) String

func (m Modification) String() string

String implements fmt.Stringer interface.

type ModificationType

type ModificationType string

ModificationType is the type of task modification.

const (
	// ModifyRequiredSlots is the type for modifying task required slots.
	// Note: required slots is introduced later and separated from the old
	// "concurrency" concept, we still use "modify_concurrency" as the modification
	// type for compatibility.
	ModifyRequiredSlots ModificationType = "modify_concurrency"
	// ModifyMaxNodeCount is the type for modifying max node count of task.
	ModifyMaxNodeCount ModificationType = "modify_max_node_count"
	// ModifyBatchSize is the type for modifying batch size of add-index.
	ModifyBatchSize ModificationType = "modify_batch_size"
	// ModifyMaxWriteSpeed is the type for modifying max write speed of add-index.
	ModifyMaxWriteSpeed ModificationType = "modify_max_write_speed"
)

func (ModificationType) String

func (t ModificationType) String() string

String implements fmt.Stringer interface.

type ModifyParam

type ModifyParam struct {
	PrevState     TaskState      `json:"prev_state"`
	Modifications []Modification `json:"modifications"`
}

ModifyParam is the parameter for task modification.

func (*ModifyParam) String

func (p *ModifyParam) String() string

String implements fmt.Stringer interface.

type NodeResource

type NodeResource struct {
	TotalCPU  int
	TotalMem  int64
	TotalDisk uint64
}

NodeResource is the resource of the node. exported for test.

func NewNodeResource

func NewNodeResource(totalCPU int, totalMem int64, totalDisk uint64) *NodeResource

NewNodeResource creates a new NodeResource.

func (*NodeResource) GetStepResource

func (nr *NodeResource) GetStepResource(task *TaskBase) *StepResource

GetStepResource gets the step resource according to slots.

func (*NodeResource) GetTaskDiskResource

func (nr *NodeResource) GetTaskDiskResource(task *TaskBase, quotaHint uint64) uint64

GetTaskDiskResource gets available disk for a task.

type Step

type Step int64

Step is the step of task.

const (
	StepInit Step = -1
	StepDone Step = -2
)

TaskStep is the step of task. DO NOT change the value of the constants, will break backward compatibility. successfully task MUST go from StepInit to business steps, then StepDone.

const (
	StepOne   Step = 1
	StepTwo   Step = 2
	StepThree Step = 3
)

Steps of example task type.

const (
	// ImportStepImport we sort source data and ingest it into TiKV in this step.
	ImportStepImport Step = 1
	// ImportStepPostProcess we verify checksum and add index in this step.
	ImportStepPostProcess Step = 2
	// ImportStepEncodeAndSort encode source data and write sorted kv into global storage.
	ImportStepEncodeAndSort Step = 3
	// ImportStepMergeSort merge sorted kv from global storage, so we can have better
	// read performance during ImportStepWriteAndIngest.
	// depends on how much kv files are overlapped, there's might 0 subtasks
	// in this step.
	ImportStepMergeSort Step = 4
	// ImportStepWriteAndIngest write sorted kv into TiKV and ingest it.
	ImportStepWriteAndIngest Step = 5
	// ImportStepCollectConflicts collect conflicts info, this step won't mutate
	// downstream data, so is idempotent, and we can collect a correct checksum
	// for the conflicted rows. if we do this together with ImportStepConflictResolution,
	// once the step retry in the middle, we can't get a correct checksum.
	// this step also need to do deduplication for the conflicted rows due to
	// multiple unique indexes to avoid repeated collection, currently, we do it
	// in memory, so if there are too many conflicts, we will skip the later
	// checksum step as we don't know the exact checksum.
	ImportStepCollectConflicts Step = 6
	// ImportStepConflictResolution resolve detected conflicts.
	// during other steps of global sort, we will detect conflicts and record them
	// in external storage, if any conflicts are detected, we will resolve them
	// here. so there might be 0 subtasks in this step.
	ImportStepConflictResolution Step = 7
)

Steps of IMPORT INTO, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order:

  • local sort: StepInit -> ImportStepImport -> ImportStepPostProcess -> StepDone
  • global sort: StepInit -> ImportStepEncodeAndSort -> ImportStepMergeSort (optional) -> ImportStepWriteAndIngest -> ImportStepCollectConflicts (optional) -> ImportStepConflictResolution (optional) -> ImportStepPostProcess -> StepDone
const (
	BackfillStepReadIndex Step = 1
	// BackfillStepMergeSort only used in global sort, it will merge sorted kv from global storage, so we can have better
	// read performance during BackfillStepWriteAndIngest with global sort.
	// depends on how much kv files are overlapped.
	// When kv files overlapped less than MergeSortOverlapThreshold, there‘re no subtasks.
	BackfillStepMergeSort Step = 2

	// BackfillStepWriteAndIngest write sorted kv into TiKV and ingest it.
	BackfillStepWriteAndIngest Step = 3

	// BackfillStepMergeTempIndex is the step to merge temp index into the original index.
	BackfillStepMergeTempIndex Step = 4
)

Steps of Add Index, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order: - local sort: StepInit -> BackfillStepReadIndex -> StepDone - global sort: StepInit -> BackfillStepReadIndex -> BackfillStepMergeSort -> BackfillStepWriteAndIngest -> StepDone

type StepResource

type StepResource struct {
	CPU *Allocatable
	Mem *Allocatable
}

StepResource is the max resource that a task step can use. it's also the max resource that a subtask can use, as we run subtasks of task step in sequence.

func (*StepResource) String

func (s *StepResource) String() string

String implements Stringer interface.

type Subtask

type Subtask struct {
	SubtaskBase
	// UpdateTime is the time when the subtask is updated.
	// it can be used as subtask end time if the subtask is finished.
	// it's 0 if it hasn't started yet.
	UpdateTime time.Time
	// Meta is the metadata of subtask, should not be nil.
	// meta of different subtasks of same step must be different too.
	// NOTE: this field can be changed by StepExecutor.OnFinished method, to store
	// some result, and framework will update the subtask meta in the storage.
	// On other code path, this field should be read-only.
	Meta    []byte
	Summary string
}

Subtask represents the subtask of distribute framework. subtasks of a task are run in parallel on different nodes, but on each node, at most 1 subtask can be run at the same time, see StepExecutor too.

func NewSubtask

func NewSubtask(step Step, taskID int64, tp TaskType, execID string, concurrency int, meta []byte, ordinal int) *Subtask

NewSubtask create a new subtask.

type SubtaskBase

type SubtaskBase struct {
	ID   int64
	Step Step
	Type TaskType
	// taken from task_key of the subtask table
	TaskID int64
	State  SubtaskState
	// Concurrency is the concurrency of the subtask.
	// it's initialized as the task's required slots, and it's NOT used now.
	// if the required slot of task is modified, the concurrency of un-finished
	// subtasks of the task will be updated too.
	// some subtasks like post-process of import into, don't consume too many resources,
	// can lower this value, can use this field to implement such feature later.
	Concurrency int
	// ExecID is the ID of target executor, right now it's the same as instance_id,
	// its value is IP:PORT, see GenerateExecID
	ExecID     string
	CreateTime time.Time
	// StartTime is the time when the subtask is started.
	// it's 0 if it hasn't started yet.
	StartTime time.Time
	// Ordinal is the ordinal of subtask, should be unique for some task and step.
	// starts from 1.
	Ordinal int
}

SubtaskBase contains the basic information of a subtask. we define this to avoid load subtask meta which might be very large into memory.

func (*SubtaskBase) IsDone

func (t *SubtaskBase) IsDone() bool

IsDone checks if the subtask is done.

func (*SubtaskBase) String

func (t *SubtaskBase) String() string

type SubtaskState

type SubtaskState string

SubtaskState is the state of subtask.

const (
	SubtaskStatePending  SubtaskState = "pending"
	SubtaskStateRunning  SubtaskState = "running"
	SubtaskStateSucceed  SubtaskState = "succeed"
	SubtaskStateFailed   SubtaskState = "failed"
	SubtaskStateCanceled SubtaskState = "canceled"
	SubtaskStatePaused   SubtaskState = "paused"
)

see doc.go for more details.

func (SubtaskState) String

func (s SubtaskState) String() string

type Task

type Task struct {
	TaskBase
	// SchedulerID is not used now.
	SchedulerID     string
	StartTime       time.Time
	StateUpdateTime time.Time
	// Meta is the metadata of task, it's read-only in most cases, but it can be
	// changed in below case, and framework will update the task meta in the storage.
	// 	- task switches to next step in Scheduler.OnNextSubtasksBatch
	// 	- on task cleanup, we might do some redaction on the meta.
	// 	- on task 'modifying', params inside the meta can be changed.
	Meta        []byte
	Error       error
	ModifyParam ModifyParam
}

Task represents the task of distributed framework, see doc.go for more details.

type TaskBase

type TaskBase struct {
	ID    int64
	Key   string
	Type  TaskType
	State TaskState
	Step  Step
	// Priority is the priority of task, the smaller value means the higher priority.
	// valid range is [1, 1024], default is NormalPriority.
	Priority int
	// RequiredSlots is the required slots of the task.
	// we use this field to allocate slots when scheduling and creating the task
	// executor, but the effective slots when running the task is determined by
	// GetRuntimeSlots.
	// in normal case, they are the same. but when meeting OOM and TiDB repeatedly
	// restarts, we might set a lower MaxRuntimeSlots in ExtraParams, then the
	// effective slots is smaller than RequiredSlots.
	// Note: in application layer, don't use this field directly, use GetRuntimeSlots
	// or GetResource of step executor instead.
	// Note: in the system table, we store it inside 'concurrency' column as
	// required slots is introduced later.
	RequiredSlots int
	// TargetScope indicates that the task should be running on tidb nodes which
	// contain the tidb_service_scope=TargetScope label.
	// To be compatible with previous version, if it's "" or "background", the
	// task try run on nodes of "background" scope,
	// if there is no such nodes, will try nodes of "" scope.
	TargetScope  string
	CreateTime   time.Time
	MaxNodeCount int
	ExtraParams  ExtraParams
	// keyspace name is the keyspace that the task belongs to.
	// it's only useful for nextgen cluster.
	Keyspace string
}

TaskBase contains the basic information of a task. we define this to avoid load task meta which might be very large into memory.

func (*TaskBase) Compare

func (t *TaskBase) Compare(other *TaskBase) int

Compare compares two tasks by task rank. returns < 0 represents rank of t is higher than 'other'.

func (*TaskBase) CompareTask

func (t *TaskBase) CompareTask(other *Task) int

CompareTask a wrapper of Compare.

func (*TaskBase) GetRuntimeSlots

func (t *TaskBase) GetRuntimeSlots() int

GetRuntimeSlots gets the runtime slots of current task step. application layer might use this as the concurrency of the task step.

func (*TaskBase) IsDone

func (t *TaskBase) IsDone() bool

IsDone checks if the task is done.

func (*TaskBase) String

func (t *TaskBase) String() string

String implements fmt.Stringer interface.

type TaskState

type TaskState string

TaskState is the state of task.

const (
	TaskStatePending            TaskState = "pending"
	TaskStateRunning            TaskState = "running"
	TaskStateSucceed            TaskState = "succeed"
	TaskStateFailed             TaskState = "failed"
	TaskStateReverting          TaskState = "reverting"
	TaskStateAwaitingResolution TaskState = "awaiting-resolution"
	TaskStateReverted           TaskState = "reverted"
	TaskStateCancelling         TaskState = "cancelling"
	TaskStatePausing            TaskState = "pausing"
	TaskStatePaused             TaskState = "paused"
	TaskStateResuming           TaskState = "resuming"
	TaskStateModifying          TaskState = "modifying"
)

see doc.go for more details.

func (TaskState) CanMoveToModifying

func (s TaskState) CanMoveToModifying() bool

CanMoveToModifying checks if current state can move to 'modifying' state.

func (TaskState) String

func (s TaskState) String() string

type TaskType

type TaskType string

TaskType is the type of task.

const (
	// TaskTypeExample is TaskType of Example, it's for test.
	TaskTypeExample TaskType = "Example"
	// ImportInto is TaskType of ImportInto.
	ImportInto TaskType = "ImportInto"
	// Backfill is TaskType of add index Backfilling process.
	Backfill TaskType = "backfill"
)

func Int2Type

func Int2Type(i int) TaskType

Int2Type converts int to task type.

func (TaskType) String

func (t TaskType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL