Documentation
¶
Index ¶
- type AssetInstance
- func (t *AssetInstance) AddDownstream(task TaskInstance)
- func (t *AssetInstance) AddUpstream(task TaskInstance)
- func (t *AssetInstance) Blocking() bool
- func (t *AssetInstance) Completed() bool
- func (t *AssetInstance) GetAsset() *pipeline.Asset
- func (t *AssetInstance) GetDownstream() []TaskInstance
- func (t *AssetInstance) GetHumanID() string
- func (t *AssetInstance) GetHumanReadableDescription() string
- func (t *AssetInstance) GetPipeline() *pipeline.Pipeline
- func (t *AssetInstance) GetStatus() TaskInstanceStatus
- func (t *AssetInstance) GetType() TaskInstanceType
- func (t *AssetInstance) GetUpstream() []TaskInstance
- func (t *AssetInstance) MarkAs(status TaskInstanceStatus)
- type ColumnCheckInstance
- type CustomCheckInstance
- type InstancesByType
- type Metadata
- type MetadataPushInstance
- type PipelineAssetState
- type PipelineState
- type RunConfig
- type Scheduler
- func (s *Scheduler) FindMajorityOfTypes(defaultIfNone pipeline.AssetType) pipeline.AssetType
- func (s *Scheduler) GetAssetCountWithTasksPending() int
- func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance
- func (s *Scheduler) InstanceCount() int
- func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int
- func (s *Scheduler) Kickstart()
- func (s *Scheduler) MarkAll(status TaskInstanceStatus)
- func (s *Scheduler) MarkAsset(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)
- func (s *Scheduler) MarkByTag(tag string, status TaskInstanceStatus, downstream bool)
- func (s *Scheduler) MarkCheckInstancesByID(checkID string, status TaskInstanceStatus) error
- func (s *Scheduler) MarkPendingInstancesByType(instanceType TaskInstanceType, status TaskInstanceStatus)
- func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)
- func (s *Scheduler) MarkTaskInstanceIfNotSkipped(instance TaskInstance, status TaskInstanceStatus, markDownstream bool)
- func (s *Scheduler) RestoreState(state *PipelineState) error
- func (s *Scheduler) Run(ctx context.Context) []*TaskExecutionResult
- func (s *Scheduler) SavePipelineState(fs afero.Fs, param *RunConfig, runID, statePath string) error
- func (s *Scheduler) Tick(result *TaskExecutionResult) bool
- func (s *Scheduler) WillRunTaskOfType(taskType pipeline.AssetType) bool
- type TaskExecutionResult
- type TaskInstance
- type TaskInstanceStatus
- type TaskInstanceType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssetInstance ¶
type AssetInstance struct {
ID string
HumanID string
Pipeline *pipeline.Pipeline
Asset *pipeline.Asset
// contains filtered or unexported fields
}
func (*AssetInstance) AddDownstream ¶
func (t *AssetInstance) AddDownstream(task TaskInstance)
func (*AssetInstance) AddUpstream ¶
func (t *AssetInstance) AddUpstream(task TaskInstance)
func (*AssetInstance) Blocking ¶ added in v0.9.0
func (t *AssetInstance) Blocking() bool
func (*AssetInstance) Completed ¶
func (t *AssetInstance) Completed() bool
func (*AssetInstance) GetAsset ¶
func (t *AssetInstance) GetAsset() *pipeline.Asset
func (*AssetInstance) GetDownstream ¶
func (t *AssetInstance) GetDownstream() []TaskInstance
func (*AssetInstance) GetHumanID ¶
func (t *AssetInstance) GetHumanID() string
func (*AssetInstance) GetHumanReadableDescription ¶ added in v0.11.29
func (t *AssetInstance) GetHumanReadableDescription() string
func (*AssetInstance) GetPipeline ¶
func (t *AssetInstance) GetPipeline() *pipeline.Pipeline
func (*AssetInstance) GetStatus ¶
func (t *AssetInstance) GetStatus() TaskInstanceStatus
func (*AssetInstance) GetType ¶
func (t *AssetInstance) GetType() TaskInstanceType
func (*AssetInstance) GetUpstream ¶
func (t *AssetInstance) GetUpstream() []TaskInstance
func (*AssetInstance) MarkAs ¶
func (t *AssetInstance) MarkAs(status TaskInstanceStatus)
type ColumnCheckInstance ¶
type ColumnCheckInstance struct {
*AssetInstance
Column *pipeline.Column
Check *pipeline.ColumnCheck
// contains filtered or unexported fields
}
func (*ColumnCheckInstance) Blocking ¶ added in v0.9.0
func (t *ColumnCheckInstance) Blocking() bool
func (*ColumnCheckInstance) GetHumanReadableDescription ¶ added in v0.11.29
func (t *ColumnCheckInstance) GetHumanReadableDescription() string
func (*ColumnCheckInstance) GetType ¶
func (t *ColumnCheckInstance) GetType() TaskInstanceType
type CustomCheckInstance ¶
type CustomCheckInstance struct {
*AssetInstance
Check *pipeline.CustomCheck
}
func (*CustomCheckInstance) Blocking ¶ added in v0.9.0
func (t *CustomCheckInstance) Blocking() bool
func (*CustomCheckInstance) GetHumanReadableDescription ¶ added in v0.11.29
func (t *CustomCheckInstance) GetHumanReadableDescription() string
func (*CustomCheckInstance) GetType ¶
func (t *CustomCheckInstance) GetType() TaskInstanceType
type InstancesByType ¶
type InstancesByType map[TaskInstanceType][]TaskInstance
func (InstancesByType) AddDownstreamByType ¶
func (i InstancesByType) AddDownstreamByType(instanceType TaskInstanceType, downstream TaskInstance)
func (InstancesByType) AddUpstreamByType ¶
func (i InstancesByType) AddUpstreamByType(instanceType TaskInstanceType, upstream TaskInstance)
type MetadataPushInstance ¶ added in v0.9.0
type MetadataPushInstance struct {
*AssetInstance
}
func (*MetadataPushInstance) Blocking ¶ added in v0.9.0
func (t *MetadataPushInstance) Blocking() bool
func (*MetadataPushInstance) GetHumanReadableDescription ¶ added in v0.11.29
func (t *MetadataPushInstance) GetHumanReadableDescription() string
func (*MetadataPushInstance) GetType ¶ added in v0.9.0
func (t *MetadataPushInstance) GetType() TaskInstanceType
type PipelineAssetState ¶ added in v0.11.122
type PipelineState ¶ added in v0.11.122
type PipelineState struct {
Parameters RunConfig `json:"parameters"`
Metadata Metadata `json:"metadata"`
State []*PipelineAssetState `json:"state"`
Version string `json:"version"`
TimeStamp time.Time `json:"timestamp"`
RunID string `json:"run_id"`
CompatibilityHash string `json:"compatibility_hash"`
}
type RunConfig ¶ added in v0.11.123
type RunConfig struct {
Downstream bool `json:"downstream"`
StartDate string `json:"startDate"`
EndDate string `json:"endDate"`
Workers int `json:"workers"`
Environment string `json:"environment"`
Force bool `json:"force"`
PushMetadata bool `json:"pushMetadata"`
NoLogFile bool `json:"noLogFile"`
FullRefresh bool `json:"fullRefresh"`
UsePip bool `json:"useUV"`
Tag string `json:"tag"`
ExcludeTag string `json:"excludeTag"`
Only []string `json:"only"`
Output string `json:"output"`
ExpUseWingetForUv bool `json:"expUseWingetForUv"`
ConfigFilePath string `json:"configFilePath"`
SensorMode string `json:"sensorMode"`
ApplyIntervalModifiers bool `json:"applyIntervalModifiers"`
Annotations string `json:"annotations"`
}
type Scheduler ¶
type Scheduler struct {
WorkQueue chan TaskInstance
Results chan *TaskExecutionResult
// contains filtered or unexported fields
}
func NewScheduler ¶
func (*Scheduler) FindMajorityOfTypes ¶ added in v0.4.3
func (*Scheduler) GetAssetCountWithTasksPending ¶ added in v0.11.293
GetAssetCountWithTasksPending returns the number of assets that have tasks (wether checks, main or metadata pushes) pending.
func (*Scheduler) GetTaskInstancesByStatus ¶
func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance
func (*Scheduler) InstanceCount ¶
func (*Scheduler) InstanceCountByStatus ¶
func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int
func (*Scheduler) Kickstart ¶
func (s *Scheduler) Kickstart()
Kickstart initiates the scheduler process by sending a "start" task for the processing.
func (*Scheduler) MarkAll ¶
func (s *Scheduler) MarkAll(status TaskInstanceStatus)
func (*Scheduler) MarkAsset ¶ added in v0.11.29
func (s *Scheduler) MarkAsset(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)
func (*Scheduler) MarkByTag ¶ added in v0.11.0
func (s *Scheduler) MarkByTag(tag string, status TaskInstanceStatus, downstream bool)
func (*Scheduler) MarkCheckInstancesByID ¶ added in v0.11.192
func (s *Scheduler) MarkCheckInstancesByID(checkID string, status TaskInstanceStatus) error
func (*Scheduler) MarkPendingInstancesByType ¶ added in v0.11.29
func (s *Scheduler) MarkPendingInstancesByType(instanceType TaskInstanceType, status TaskInstanceStatus)
func (*Scheduler) MarkTaskInstance ¶
func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)
func (*Scheduler) MarkTaskInstanceIfNotSkipped ¶ added in v0.11.114
func (s *Scheduler) MarkTaskInstanceIfNotSkipped(instance TaskInstance, status TaskInstanceStatus, markDownstream bool)
func (*Scheduler) RestoreState ¶ added in v0.11.123
func (s *Scheduler) RestoreState(state *PipelineState) error
func (*Scheduler) SavePipelineState ¶ added in v0.11.122
func (*Scheduler) Tick ¶
func (s *Scheduler) Tick(result *TaskExecutionResult) bool
Tick marks an iteration of the scheduler loop. It is called when a result is received. The results are mainly fed from a channel, but Tick allows implementing additional methods of passing Asset results and simulating scheduler loops, e.g. time travel. It is also useful for testing purposes.
type TaskExecutionResult ¶
type TaskExecutionResult struct {
Instance TaskInstance
Error error
}
type TaskInstance ¶
type TaskInstance interface {
GetPipeline() *pipeline.Pipeline
GetAsset() *pipeline.Asset
GetType() TaskInstanceType
GetHumanID() string
GetHumanReadableDescription() string
GetStatus() TaskInstanceStatus
MarkAs(status TaskInstanceStatus)
Completed() bool
Blocking() bool
GetUpstream() []TaskInstance
GetDownstream() []TaskInstance
AddUpstream(t TaskInstance)
AddDownstream(t TaskInstance)
}
type TaskInstanceStatus ¶
type TaskInstanceStatus int
const ( Pending TaskInstanceStatus = iota Queued Running Failed UpstreamFailed Succeeded Skipped )
func GetStatusForTask ¶ added in v0.11.122
func GetStatusForTask(tasks []TaskInstanceStatus) TaskInstanceStatus
func StatusFromString ¶ added in v0.11.123
func StatusFromString(status string) TaskInstanceStatus
func (TaskInstanceStatus) String ¶
func (s TaskInstanceStatus) String() string
type TaskInstanceType ¶
type TaskInstanceType int
const ( TaskInstanceTypeMain TaskInstanceType = iota TaskInstanceTypeColumnCheck TaskInstanceTypeCustomCheck TaskInstanceTypeMetadataPush )
func (TaskInstanceType) String ¶
func (s TaskInstanceType) String() string
Click to show internal directories.
Click to hide internal directories.