Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
 - Variables
 - func IsSameScope(left, right *common.ID) bool
 - func JobName(jt JobType) string
 - func NewParallelJobScheduler(parallism int) *parallelJobScheduler
 - func NewPoolHandler(ctx context.Context, num int) *poolHandler
 - func NewSingleWorkerHandler(ctx context.Context, name string) *singleWorkerHandler
 - func NextTaskId() uint64
 - func RegisterJobType(jt JobType, jn string)
 - func RegisterType(t TaskType, name string)
 - func TaskName(t TaskType) string
 - type BaseDispatcher
 - type BaseScheduler
 - type BaseScopedDispatcher
 - type BaseTask
 - type BaseTaskHandler
 - type Context
 - type Dispatcher
 - type FnTask
 - type FuncT
 - type Job
 - func (job *Job) Close()
 - func (job *Job) DoneWithErr(err error)
 - func (job *Job) GetResult() *JobResult
 - func (job *Job) ID() string
 - func (job *Job) Init(ctx context.Context, id string, typ JobType, exec JobExecutor)
 - func (job *Job) Reset()
 - func (job *Job) Run()
 - func (job *Job) String() string
 - func (job *Job) Type() JobType
 - func (job *Job) WaitDone() *JobResult
 
- type JobExecutor
 - type JobResult
 - type JobScheduler
 - type JobType
 - type MScopedTask
 - type MultiScopedFnTask
 - type Scheduler
 - type ScopedFnTask
 - type ScopedTask
 - type ScopedTaskSharder
 - type Task
 - type TaskHandler
 - type TaskScheduler
 - type TaskType
 - type TxnTaskFactory
 
Constants ¶
      View Source
      
  
const ( JTAny JobType = iota JTCustomizedStart = 100 JTInvalid = 10000 )
Variables ¶
      View Source
      
  
    var ( ErrDispatcherNotFound = moerr.NewInternalErrorNoCtx("tae sched: dispatcher not found") ErrSchedule = moerr.NewInternalErrorNoCtx("tae sched: cannot schedule") )
      View Source
      
  
    var ( ErrBadTaskRequestPara = moerr.NewInternalErrorNoCtx("tae scheduler: bad task request parameters") ErrScheduleScopeConflict = moerr.NewInternalErrorNoCtx("tae scheduler: scope conflict") )
      View Source
      
  
    var DefaultScopeSharder = func(scope *common.ID) int { if scope == nil { return 0 } hasher := fnv.New64a() hasher.Write(types.EncodeUint64(&scope.TableID)) hasher.Write(types.EncodeUuid(scope.SegmentID())) return int(hasher.Sum64()) }
      View Source
      
  
    var (
	ErrDispatchWrongTask = moerr.NewInternalErrorNoCtx("tae: wrong task type")
)
    
      View Source
      
  
    var (
	ErrTaskHandleEnqueue = moerr.NewInternalErrorNoCtx("tae: task handle enqueue")
)
    
      View Source
      
  
    var SerialJobScheduler = new(simpleJobSceduler)
    
      View Source
      
  
var WaitableCtx = &Context{Waitable: true}
    Functions ¶
func IsSameScope ¶
func NewParallelJobScheduler ¶ added in v0.7.0
func NewParallelJobScheduler(parallism int) *parallelJobScheduler
func NewPoolHandler ¶
func NewSingleWorkerHandler ¶
func NextTaskId ¶
func NextTaskId() uint64
func RegisterJobType ¶ added in v0.8.0
func RegisterType ¶
Types ¶
type BaseDispatcher ¶
type BaseDispatcher struct {
	// contains filtered or unexported fields
}
    func NewBaseDispatcher ¶
func NewBaseDispatcher() *BaseDispatcher
func (*BaseDispatcher) Close ¶
func (d *BaseDispatcher) Close() error
func (*BaseDispatcher) Dispatch ¶
func (d *BaseDispatcher) Dispatch(task Task)
func (*BaseDispatcher) RegisterHandler ¶
func (d *BaseDispatcher) RegisterHandler(t TaskType, h TaskHandler)
type BaseScheduler ¶
type BaseScheduler struct {
	ops.OpWorker
	Dispatchers map[TaskType]Dispatcher
	// contains filtered or unexported fields
}
    func NewBaseScheduler ¶
func NewBaseScheduler(ctx context.Context, name string) *BaseScheduler
func (*BaseScheduler) RegisterDispatcher ¶
func (s *BaseScheduler) RegisterDispatcher(t TaskType, dispatcher Dispatcher)
func (*BaseScheduler) Schedule ¶
func (s *BaseScheduler) Schedule(task Task) error
func (*BaseScheduler) Stop ¶
func (s *BaseScheduler) Stop()
type BaseScopedDispatcher ¶
type BaseScopedDispatcher struct {
	// contains filtered or unexported fields
}
    func NewBaseScopedDispatcher ¶
func NewBaseScopedDispatcher(sharder ScopedTaskSharder) *BaseScopedDispatcher
func (*BaseScopedDispatcher) AddHandle ¶
func (d *BaseScopedDispatcher) AddHandle(h TaskHandler)
func (*BaseScopedDispatcher) Close ¶
func (d *BaseScopedDispatcher) Close() error
func (*BaseScopedDispatcher) Dispatch ¶
func (d *BaseScopedDispatcher) Dispatch(task Task)
type BaseTaskHandler ¶
func NewBaseEventHandler ¶
func NewBaseEventHandler(ctx context.Context, name string) *BaseTaskHandler
func (*BaseTaskHandler) Close ¶
func (h *BaseTaskHandler) Close() error
func (*BaseTaskHandler) Enqueue ¶
func (h *BaseTaskHandler) Enqueue(task Task)
func (*BaseTaskHandler) Execute ¶
func (h *BaseTaskHandler) Execute(task Task)
type Dispatcher ¶
type Job ¶ added in v0.7.0
type Job struct {
	// contains filtered or unexported fields
}
    func (*Job) DoneWithErr ¶ added in v0.8.0
type JobExecutor ¶ added in v0.7.0
type JobScheduler ¶ added in v0.7.0
type MScopedTask ¶
type MultiScopedFnTask ¶
type MultiScopedFnTask struct {
	*FnTask
	// contains filtered or unexported fields
}
    func NewMultiScopedFnTask ¶
func (*MultiScopedFnTask) Scopes ¶
func (task *MultiScopedFnTask) Scopes() []common.ID
type ScopedFnTask ¶
type ScopedFnTask struct {
	*FnTask
	// contains filtered or unexported fields
}
    func NewScopedFnTask ¶
func (*ScopedFnTask) Scope ¶
func (task *ScopedFnTask) Scope() *common.ID
type ScopedTask ¶
type ScopedTaskSharder ¶
type TaskScheduler ¶
type TaskScheduler interface {
	Scheduler
	ScheduleTxnTask(ctx *Context, taskType TaskType, factory TxnTaskFactory) (Task, error)
	ScheduleMultiScopedTxnTask(ctx *Context, taskType TaskType, scopes []common.ID, factory TxnTaskFactory) (Task, error)
	ScheduleMultiScopedFn(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) (Task, error)
	ScheduleFn(ctx *Context, taskType TaskType, fn func() error) (Task, error)
	ScheduleScopedFn(ctx *Context, taskType TaskType, scope *common.ID, fn func() error) (Task, error)
	GetCheckpointedLSN() uint64
	GetPenddingLSNCnt() uint64
	GetCheckpointTS() types.TS
}
    
      
      Source Files
      ¶
    
   Click to show internal directories. 
   Click to hide internal directories.