tasks

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDispatcherNotFound = moerr.NewInternalErrorNoCtx("tae sched: dispatcher not found")
	ErrSchedule           = moerr.NewInternalErrorNoCtx("tae sched: cannot schedule")
)
View Source
var (
	ErrBadTaskRequestPara    = moerr.NewInternalErrorNoCtx("tae scheduler: bad task request parameters")
	ErrScheduleScopeConflict = moerr.NewInternalErrorNoCtx("tae scheduler: scope conflict")
)
View Source
var DefaultScopeSharder = func(scope *common.ID) int {
	if scope == nil {
		return 0
	}
	return int(scope.TableID + scope.SegmentID)
}
View Source
var (
	ErrDispatchWrongTask = moerr.NewInternalErrorNoCtx("tae: wrong task type")
)
View Source
var (
	ErrTaskHandleEnqueue = moerr.NewInternalErrorNoCtx("tae: task handle enqueue")
)
View Source
var SerialJobScheduler = new(simpleJobSceduler)
View Source
var WaitableCtx = &Context{Waitable: true}

Functions

func IsSameScope

func IsSameScope(left, right *common.ID) bool

func NewParallelJobScheduler added in v0.7.0

func NewParallelJobScheduler(parallism int) *parallelJobScheduler

func NewPoolHandler

func NewPoolHandler(num int) *poolHandler

func NewSingleWorkerHandler

func NewSingleWorkerHandler(name string) *singleWorkerHandler

func NextTaskId

func NextTaskId() uint64

func RegisterType

func RegisterType(t TaskType, name string)

func TaskName

func TaskName(t TaskType) string

Types

type BaseDispatcher

type BaseDispatcher struct {
	// contains filtered or unexported fields
}

func NewBaseDispatcher

func NewBaseDispatcher() *BaseDispatcher

func (*BaseDispatcher) Close

func (d *BaseDispatcher) Close() error

func (*BaseDispatcher) Dispatch

func (d *BaseDispatcher) Dispatch(task Task)

func (*BaseDispatcher) RegisterHandler

func (d *BaseDispatcher) RegisterHandler(t TaskType, h TaskHandler)

type BaseScheduler

type BaseScheduler struct {
	ops.OpWorker

	Dispatchers map[TaskType]Dispatcher
	// contains filtered or unexported fields
}

func NewBaseScheduler

func NewBaseScheduler(name string) *BaseScheduler

func (*BaseScheduler) RegisterDispatcher

func (s *BaseScheduler) RegisterDispatcher(t TaskType, dispatcher Dispatcher)

func (*BaseScheduler) Schedule

func (s *BaseScheduler) Schedule(task Task) error

func (*BaseScheduler) Stop

func (s *BaseScheduler) Stop()

type BaseScopedDispatcher

type BaseScopedDispatcher struct {
	// contains filtered or unexported fields
}

func NewBaseScopedDispatcher

func NewBaseScopedDispatcher(sharder ScopedTaskSharder) *BaseScopedDispatcher

func (*BaseScopedDispatcher) AddHandle

func (d *BaseScopedDispatcher) AddHandle(h TaskHandler)

func (*BaseScopedDispatcher) Close

func (d *BaseScopedDispatcher) Close() error

func (*BaseScopedDispatcher) Dispatch

func (d *BaseScopedDispatcher) Dispatch(task Task)

type BaseTask

type BaseTask struct {
	ops.Op
	// contains filtered or unexported fields
}

func NewBaseTask

func NewBaseTask(impl Task, taskType TaskType, ctx *Context) *BaseTask

func (*BaseTask) Cancel

func (task *BaseTask) Cancel() (err error)

func (*BaseTask) Execute

func (task *BaseTask) Execute() (err error)

func (*BaseTask) ID

func (task *BaseTask) ID() uint64

func (*BaseTask) Name

func (task *BaseTask) Name() string

func (*BaseTask) Type

func (task *BaseTask) Type() TaskType

type BaseTaskHandler

type BaseTaskHandler struct {
	ops.OpWorker
}

func NewBaseEventHandler

func NewBaseEventHandler(name string) *BaseTaskHandler

func (*BaseTaskHandler) Close

func (h *BaseTaskHandler) Close() error

func (*BaseTaskHandler) Enqueue

func (h *BaseTaskHandler) Enqueue(task Task)

func (*BaseTaskHandler) Execute

func (h *BaseTaskHandler) Execute(task Task)

type Context

type Context struct {
	DoneCB   ops.OpDoneCB
	Waitable bool
}

type Dispatcher

type Dispatcher interface {
	io.Closer
	Dispatch(Task)
}

type FnTask

type FnTask struct {
	*BaseTask
	Fn FuncT
}

func NewFnTask

func NewFnTask(ctx *Context, taskType TaskType, fn FuncT) *FnTask

func (*FnTask) Execute

func (task *FnTask) Execute() error

type FuncT

type FuncT = func() error

type Job added in v0.7.0

type Job struct {
	// contains filtered or unexported fields
}

func NewJob added in v0.7.0

func NewJob(id string, ctx context.Context, exec JobExecutor) *Job

func (*Job) Close added in v0.7.0

func (job *Job) Close()

func (*Job) GetResult added in v0.7.0

func (job *Job) GetResult() *JobResult

func (*Job) Run added in v0.7.0

func (job *Job) Run()

func (*Job) WaitDone added in v0.7.0

func (job *Job) WaitDone() *JobResult

type JobExecutor added in v0.7.0

type JobExecutor = func(context.Context) *JobResult

type JobResult added in v0.7.0

type JobResult struct {
	Err error
	Res any
}

type JobScheduler added in v0.7.0

type JobScheduler interface {
	Schedule(job *Job) error
	Stop()
}

type MScopedTask

type MScopedTask interface {
	Task
	Scopes() []common.ID
}

type MultiScopedFnTask

type MultiScopedFnTask struct {
	*FnTask
	// contains filtered or unexported fields
}

func NewMultiScopedFnTask

func NewMultiScopedFnTask(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) *MultiScopedFnTask

func (*MultiScopedFnTask) Scopes

func (task *MultiScopedFnTask) Scopes() []common.ID

type Scheduler

type Scheduler interface {
	Start()
	Stop()
	Schedule(Task) error
}

type ScopedFnTask

type ScopedFnTask struct {
	*FnTask
	// contains filtered or unexported fields
}

func NewScopedFnTask

func NewScopedFnTask(ctx *Context, taskType TaskType, scope *common.ID, fn FuncT) *ScopedFnTask

func (*ScopedFnTask) Scope

func (task *ScopedFnTask) Scope() *common.ID

type ScopedTask

type ScopedTask interface {
	Task
	Scope() *common.ID
}

type ScopedTaskSharder

type ScopedTaskSharder = func(scope *common.ID) int

type Task

type Task interface {
	base.IOp
	ID() uint64
	Type() TaskType
	Cancel() error
	Name() string
}

type TaskHandler

type TaskHandler interface {
	io.Closer
	Start()
	Enqueue(Task)
	Execute(Task)
}

type TaskScheduler

type TaskScheduler interface {
	Scheduler
	ScheduleTxnTask(ctx *Context, taskType TaskType, factory TxnTaskFactory) (Task, error)
	ScheduleMultiScopedTxnTask(ctx *Context, taskType TaskType, scopes []common.ID, factory TxnTaskFactory) (Task, error)
	ScheduleMultiScopedFn(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) (Task, error)
	ScheduleFn(ctx *Context, taskType TaskType, fn func() error) (Task, error)
	ScheduleScopedFn(ctx *Context, taskType TaskType, scope *common.ID, fn func() error) (Task, error)
	Checkpoint(indexes []*wal.Index) error

	AddTransferPage(*model.TransferHashPage) error
	DeleteTransferPage(id *common.ID) error

	GetCheckpointedLSN() uint64
	GetPenddingLSNCnt() uint64
	GetGCTS() types.TS
	GetCheckpointTS() types.TS
}

type TaskType

type TaskType uint16
const (
	NoopTask TaskType = iota
	MockTask
	CustomizedTask

	DataCompactionTask
	CheckpointTask
	GCTask
	IOTask
)

type TxnTaskFactory

type TxnTaskFactory = func(ctx *Context, txn txnif.AsyncTxn) (Task, error)

Directories

Path Synopsis
ops

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL