Documentation
¶
Index ¶
- Constants
- type Elem
- type Op
- type TaskOp
- type TaskOpTuneTriggers
- type TaskRun
- func (tr *TaskRun) AddTaskConcurrencyCount(add int, overwriteOpt ...bool)
- func (tr *TaskRun) AppendLastMsg(msg string) error
- func (tr *TaskRun) CalibrateConcurrencyCountFromDB()
- func (tr *TaskRun) Do(itr TaskOp) error
- func (tr *TaskRun) EnsureFetchLatestPipelineStatus()
- func (tr *TaskRun) GetActionSpec() apistructs.ActionSpec
- func (tr *TaskRun) GetConcurrencyLimit(concurrency *apistructs.ActionConcurrency) int
- func (tr *TaskRun) GetTaskConcurrencyCount() int
- func (tr *TaskRun) LogStep(taskOp Op, step string)
- func (tr *TaskRun) Teardown()
- func (tr *TaskRun) TeardownConcurrencyCount()
- func (tr *TaskRun) TeardownPriorityQueue()
- func (tr *TaskRun) Update()
- func (tr *TaskRun) UpdateTaskInspect(inspect string) error
Constants ¶
View Source
const NoConcurrencyLimit = -1
-1 代表无限制
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Elem ¶
type Elem struct {
TimeoutCh <-chan struct{}
Cancel context.CancelFunc
Timeout time.Duration
ErrCh chan error
DoneCh chan interface{}
ExitCh chan struct{}
}
type TaskOp ¶
type TaskOp interface {
Op() Op
TaskRun() *TaskRun
// Processing represents what the `op` really do, you shouldn't update task inside `processing`.
// you should put update logic in `WhenXXX`.
Processing() (interface{}, error)
// WhenDone will be invoked if task op is done.
WhenDone(data interface{}) error
// WhenLogicError will be invoked if task occurred an error when do logic.
WhenLogicError(err error) error
// WhenTimeout will be invoked if task is timeout.
WhenTimeout() error
TimeoutConfig() (<-chan struct{}, context.CancelFunc, time.Duration)
// TuneTriggers return corresponding triggers at concrete tune point.
TuneTriggers() TaskOpTuneTriggers
}
type TaskOpTuneTriggers ¶
type TaskOpTuneTriggers struct {
BeforeProcessing aoptypes.TuneTrigger
AfterProcessing aoptypes.TuneTrigger
}
type TaskRun ¶
type TaskRun struct {
Task *spec.PipelineTask
Ctx context.Context
Executor types.ActionExecutor
Throttler throttler.Throttler
P *spec.Pipeline
QueriedPipelineStatus apistructs.PipelineStatus
Bdl *bundle.Bundle
DBClient *dbclient.Client
Js jsonstore.JsonStore
QuitQueueTimeout bool
QuitWaitTimeout bool
StopQueueLoop bool
StopWaitLoop bool
PExitCh <-chan struct{}
PExitChCancel context.CancelFunc
PExit bool
ExecutorDoneCh chan interface{}
// 轮训状态间隔期间可能任务已经是终态,FakeTimeout = true
FakeTimeout bool
// svc
ActionAgentSvc *actionagentsvc.ActionAgentSvc
ExtMarketSvc *extmarketsvc.ExtMarketSvc
}
TaskRun represents task runtime.
func New ¶
func New(ctx context.Context, task *spec.PipelineTask, pExitCh <-chan struct{}, pExitChCancel context.CancelFunc, throttler throttler.Throttler, executor types.ActionExecutor, p *spec.Pipeline, bdl *bundle.Bundle, dbClient *dbclient.Client, js jsonstore.JsonStore, actionAgentSvc *actionagentsvc.ActionAgentSvc, extMarketSvc *extmarketsvc.ExtMarketSvc, ) *TaskRun
New returns a TaskRun.
func (*TaskRun) AddTaskConcurrencyCount ¶
func (*TaskRun) AppendLastMsg ¶
func (*TaskRun) CalibrateConcurrencyCountFromDB ¶
func (tr *TaskRun) CalibrateConcurrencyCountFromDB()
CalibrateConcurrencyCountFromDB 从数据库校准并发度
func (*TaskRun) EnsureFetchLatestPipelineStatus ¶
func (tr *TaskRun) EnsureFetchLatestPipelineStatus()
func (*TaskRun) GetActionSpec ¶
func (tr *TaskRun) GetActionSpec() apistructs.ActionSpec
func (*TaskRun) GetConcurrencyLimit ¶
func (tr *TaskRun) GetConcurrencyLimit(concurrency *apistructs.ActionConcurrency) int
func (*TaskRun) GetTaskConcurrencyCount ¶
func (*TaskRun) LogStep ¶
reconciler: pipelineID: 1, taskID: 1, taskName: repo, taskOp: start, step: begin do WhenDone
func (*TaskRun) TeardownConcurrencyCount ¶
func (tr *TaskRun) TeardownConcurrencyCount()
func (*TaskRun) TeardownPriorityQueue ¶
func (tr *TaskRun) TeardownPriorityQueue()
func (*TaskRun) UpdateTaskInspect ¶ added in v1.2.0
UpdateTaskInspect update task inspect, and get events from inspect
Source Files
¶
Click to show internal directories.
Click to hide internal directories.