Documentation
¶
Index ¶
- Constants
- Variables
- func CtxWithRunningTaskIns(ctx context.Context, task *TaskInstance) context.Context
- type ActiveAction
- type BaseInfo
- type BaseInfoGetter
- type Check
- type Command
- type CommandName
- type CtxKey
- type Dag
- type DagInstance
- func (dagIns *DagInstance) Block(reason string)
- func (dagIns *DagInstance) CanModifyStatus() bool
- func (dagIns *DagInstance) Cancel(taskInsIds []string) error
- func (dagIns *DagInstance) Continue(taskInsIds []string) error
- func (dagIns *DagInstance) Fail(reason string)
- func (dagIns *DagInstance) Retry(taskInsIds []string) error
- func (dagIns *DagInstance) Run()
- func (dagIns *DagInstance) Success()
- func (dagIns *DagInstance) VarsGetter() utils.KeyValueGetter
- func (dagIns *DagInstance) VarsIterator() utils.KeyValueIterator
- type DagInstanceHookFunc
- type DagInstanceLifecycleHook
- type DagInstanceStatus
- type DagInstanceVar
- type DagInstanceVars
- type DagStatus
- type DagVar
- type DagVars
- type MockBaseInfoGetter
- type Operator
- type PreChecks
- type ShareData
- func (d *ShareData) Get(key string) (string, bool)
- func (d *ShareData) MarshalBSON() ([]byte, error)
- func (d *ShareData) MarshalJSON() ([]byte, error)
- func (d *ShareData) Set(key string, val string)
- func (d *ShareData) UnmarshalBSON(data []byte) error
- func (d *ShareData) UnmarshalJSON(data []byte) error
- type SpecifiedVar
- type Task
- type TaskCondition
- type TaskConditionSource
- type TaskInstance
- func (t *TaskInstance) DoPreCheck(dagIns *DagInstance) (isActive bool, err error)
- func (t *TaskInstance) GetDepend() []string
- func (t *TaskInstance) GetGraphID() string
- func (t *TaskInstance) GetID() string
- func (t *TaskInstance) GetStatus() TaskInstanceStatus
- func (t *TaskInstance) InitialDep(ctx run.ExecuteContext, patch func(*TaskInstance) error, dagIns *DagInstance)
- func (t *TaskInstance) Run(params interface{}, act run.Action) (err error)
- func (t *TaskInstance) SetStatus(s TaskInstanceStatus) error
- func (t *TaskInstance) Trace(msg string, ops ...run.TraceOp)
- type TaskInstanceStatus
- type TraceInfo
- type Trigger
Constants ¶
const ( CommandNameRetry = "retry" CommandNameCancel = "cancel" CommandNameContinue = "continue" )
Variables ¶
var ( StoreMarshal func(interface{}) ([]byte, error) StoreUnmarshal func([]byte, interface{}) error )
Functions ¶
func CtxWithRunningTaskIns ¶
func CtxWithRunningTaskIns(ctx context.Context, task *TaskInstance) context.Context
Types ¶
type ActiveAction ¶
type ActiveAction string
const ( // skip action when all condition is meet, otherwise execute it ActiveActionSkip ActiveAction = "skip" // block action when all condition is meet, otherwise execute it ActiveActionBlock ActiveAction = "block" )
type BaseInfo ¶
type BaseInfo struct {
ID string `yaml:"id" json:"id" bson:"_id"`
CreatedAt int64 `yaml:"createdAt" json:"createdAt" bson:"createdAt"`
UpdatedAt int64 `yaml:"updatedAt" json:"updatedAt" bson:"updatedAt"`
}
BaseInfo
type Check ¶
type Check struct {
Conditions []TaskCondition `yaml:"conditions,omitempty" json:"conditions,omitempty" bson:"conditions,omitempty"`
Act ActiveAction `yaml:"act,omitempty" json:"act,omitempty" bson:"act,omitempty"`
}
Check
func (*Check) IsMeet ¶
func (c *Check) IsMeet(dagIns *DagInstance) bool
IsMeet return if check is meet
type Dag ¶
type Dag struct {
BaseInfo `yaml:",inline" json:",inline" bson:"inline"`
Name string `yaml:"name,omitempty" json:"name,omitempty" bson:"name,omitempty"`
Desc string `yaml:"desc,omitempty" json:"desc,omitempty" bson:"desc,omitempty"`
Cron string `yaml:"cron,omitempty" json:"cron,omitempty" bson:"cron,omitempty"`
Vars DagVars `yaml:"vars,omitempty" json:"vars,omitempty" bson:"vars,omitempty"`
Status DagStatus `yaml:"status,omitempty" json:"status,omitempty" bson:"status,omitempty"`
Tasks []Task `yaml:"tasks,omitempty" json:"tasks,omitempty" bson:"tasks,omitempty"`
BntID string `yaml:"bntId,omitempty" json:"bntId,omitempty" bson:"bntId,omitempty"`
ResourceVersion string `yaml:"resourceVersion,omitempty" json:"resourceVersion,omitempty" bson:"resourceVersion,omitempty"`
ValidVersionSeq uint64 `yaml:"validVersionSeq" json:"validVersionSeq" bson:"validVersionSeq"`
}
Dag
type DagInstance ¶
type DagInstance struct {
BaseInfo `bson:"inline"`
DagID string `json:"dagId,omitempty" bson:"dagId,omitempty"`
Trigger Trigger `json:"trigger,omitempty" bson:"trigger,omitempty"`
Worker string `json:"worker,omitempty" bson:"worker,omitempty"`
Vars DagInstanceVars `json:"vars,omitempty" bson:"vars,omitempty"`
Status DagInstanceStatus `json:"status,omitempty" bson:"status,omitempty"`
Reason string `json:"reason,omitempty" bson:"reason,omitempty"`
Cmd *Command `json:"cmd,omitempty" bson:"cmd,omitempty"`
}
DagInstance
func (*DagInstance) CanModifyStatus ¶
func (dagIns *DagInstance) CanModifyStatus() bool
CanChange indicate if the dag instance can modify status
func (*DagInstance) Cancel ¶
func (dagIns *DagInstance) Cancel(taskInsIds []string) error
Cancel a task, it is just set a command, command will execute by Parser
func (*DagInstance) Continue ¶
func (dagIns *DagInstance) Continue(taskInsIds []string) error
Continue tasks, it is just set a command, command will execute by Parser
func (*DagInstance) Retry ¶
func (dagIns *DagInstance) Retry(taskInsIds []string) error
Retry tasks, it is just set a command, command will execute by Parser
func (*DagInstance) VarsGetter ¶
func (dagIns *DagInstance) VarsGetter() utils.KeyValueGetter
VarsGetter
func (*DagInstance) VarsIterator ¶
func (dagIns *DagInstance) VarsIterator() utils.KeyValueIterator
VarsIterator
type DagInstanceHookFunc ¶
type DagInstanceHookFunc func(dagIns *DagInstance)
type DagInstanceLifecycleHook ¶
type DagInstanceLifecycleHook struct {
BeforeRun DagInstanceHookFunc
BeforeSuccess DagInstanceHookFunc
BeforeFail DagInstanceHookFunc
BeforeBlock DagInstanceHookFunc
BeforeRetry DagInstanceHookFunc
BeforeContinue DagInstanceHookFunc
}
DagInstanceLifecycleHook
var (
HookDagInstance DagInstanceLifecycleHook
)
type DagInstanceStatus ¶
type DagInstanceStatus string
DagInstanceStatus
const ( DagInstanceStatusInit DagInstanceStatus = "init" DagInstanceStatusScheduled DagInstanceStatus = "scheduled" DagInstanceStatusRunning DagInstanceStatus = "running" DagInstanceStatusBlocked DagInstanceStatus = "blocked" DagInstanceStatusFailed DagInstanceStatus = "failed" DagInstanceStatusSuccess DagInstanceStatus = "success" )
type DagInstanceVar ¶
type DagInstanceVar struct {
Value string `json:"value,omitempty" bson:"value,omitempty"`
}
DagInstanceVar
type DagVar ¶
type DagVar struct {
Desc string `yaml:"desc,omitempty" json:"desc,omitempty" bson:"desc,omitempty"`
DefaultValue string `yaml:"defaultValue,omitempty" json:"defaultValue,omitempty" bson:"defaultValue,omitempty"`
}
DagVar
type MockBaseInfoGetter ¶
MockBaseInfoGetter is an autogenerated mock type for the BaseInfoGetter type
func (*MockBaseInfoGetter) GetBaseInfo ¶
func (_m *MockBaseInfoGetter) GetBaseInfo() *BaseInfo
GetBaseInfo provides a mock function with given fields:
type ShareData ¶
type ShareData struct {
// contains filtered or unexported fields
}
ShareData can read/write within all tasks and will persist it if you want a high performance just within same task, you can use ExecuteContext's Context
func (*ShareData) MarshalBSON ¶
MarshalBSON used by mongo
func (*ShareData) MarshalJSON ¶
MarshalJSON used by json
func (*ShareData) UnmarshalBSON ¶
UnmarshalBSON used by mongo
func (*ShareData) UnmarshalJSON ¶
UnmarshalJSON used by json
type Task ¶
type Task struct {
ID string `yaml:"id,omitempty" json:"id,omitempty" bson:"id,omitempty"`
Name string `yaml:"name,omitempty" json:"name,omitempty" bson:"name,omitempty"`
DependOn []string `yaml:"dependOn,omitempty" json:"dependOn,omitempty" bson:"dependOn,omitempty"`
ActionName string `yaml:"actionName,omitempty" json:"actionName,omitempty" bson:"actionName,omitempty"`
TimeoutSecs int `yaml:"timeoutSecs,omitempty" json:"timeoutSecs,omitempty" bson:"timeoutSecs,omitempty"`
Params map[string]interface{} `yaml:"params,omitempty" json:"params,omitempty" bson:"params,omitempty"`
PreChecks PreChecks `yaml:"preCheck,omitempty" json:"preCheck,omitempty" bson:"preCheck,omitempty"`
}
Task
type TaskCondition ¶
type TaskCondition struct {
Source TaskConditionSource `yaml:"source,omitempty" json:"source,omitempty" bson:"source,omitempty"`
Key string `yaml:"key,omitempty" json:"key,omitempty" bson:"key,omitempty"`
Values []string `yaml:"values,omitempty" json:"values,omitempty" bson:"values,omitempty"`
Op Operator `yaml:"op,omitempty" json:"op,omitempty" bson:"op,omitempty"`
}
TaskCondition
func (*TaskCondition) IsMeet ¶
func (c *TaskCondition) IsMeet(dagIns *DagInstance) bool
IsMeet return if check is meet
type TaskConditionSource ¶
type TaskConditionSource string
const ( TaskConditionSourceVars TaskConditionSource = "vars" )
func (TaskConditionSource) BuildKvGetter ¶
func (t TaskConditionSource) BuildKvGetter(dagIns *DagInstance) utils.KeyValueGetter
BuildKvGetter
type TaskInstance ¶
type TaskInstance struct {
BaseInfo `bson:"inline"`
// Task's Id it should be unique in a dag instance
TaskID string `json:"taskId,omitempty" bson:"taskId,omitempty"`
DagInsID string `json:"dagInsId,omitempty" bson:"dagInsId,omitempty"`
Name string `json:"name,omitempty" bson:"name,omitempty"`
DependOn []string `json:"dependOn,omitempty" bson:"dependOn,omitempty"`
ActionName string `json:"actionName,omitempty" bson:"actionName,omitempty"`
TimeoutSecs int `json:"timeoutSecs" bson:"timeoutSecs"`
Params map[string]interface{} `json:"params,omitempty" bson:"params,omitempty"`
Traces []TraceInfo `json:"traces,omitempty" bson:"traces,omitempty"`
Status TaskInstanceStatus `json:"status,omitempty" bson:"status,omitempty"`
Reason string `json:"reason,omitempty" bson:"reason,omitempty"`
PreChecks PreChecks `json:"preChecks,omitempty" bson:"preChecks,omitempty"`
// used to save changes
Patch func(*TaskInstance) error `json:"-" bson:"-"`
Context run.ExecuteContext `json:"-" bson:"-"`
RelatedDagInstance *DagInstance `json:"-" bson:"-"`
TimeUsed string `json:"timeUsed,omitempty" bson:"timeUsed,omitempty"`
// contains filtered or unexported fields
}
TaskInstance
func CtxRunningTaskIns ¶
func CtxRunningTaskIns(ctx context.Context) (*TaskInstance, bool)
func (*TaskInstance) DoPreCheck ¶
func (t *TaskInstance) DoPreCheck(dagIns *DagInstance) (isActive bool, err error)
DoPreCheck
func (*TaskInstance) GetGraphID ¶
func (t *TaskInstance) GetGraphID() string
GetGraphID 注意这里指的不是图的ID,指的是该TaskInstance在图中的TaskID
func (*TaskInstance) InitialDep ¶
func (t *TaskInstance) InitialDep(ctx run.ExecuteContext, patch func(*TaskInstance) error, dagIns *DagInstance)
InitialDep
func (*TaskInstance) Run ¶
func (t *TaskInstance) Run(params interface{}, act run.Action) (err error)
Run action
func (*TaskInstance) SetStatus ¶
func (t *TaskInstance) SetStatus(s TaskInstanceStatus) error
SetStatus will persist task instance
type TaskInstanceStatus ¶
type TaskInstanceStatus string
TaskInstanceStatus
const ( TaskInstanceStatusInit TaskInstanceStatus = "init" TaskInstanceStatusCanceled TaskInstanceStatus = "canceled" TaskInstanceStatusRunning TaskInstanceStatus = "running" TaskInstanceStatusEnding TaskInstanceStatus = "ending" TaskInstanceStatusFailed TaskInstanceStatus = "failed" TaskInstanceStatusRetrying TaskInstanceStatus = "retrying" TaskInstanceStatusSuccess TaskInstanceStatus = "success" TaskInstanceStatusBlocked TaskInstanceStatus = "blocked" TaskInstanceStatusContinue TaskInstanceStatus = "continue" TaskInstanceStatusSkipped TaskInstanceStatus = "skipped" )