Documentation
¶
Index ¶
- Constants
- Variables
- func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)
- func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.LogConfig) *orm
- func NewRunner(orm ORM, config Config, chainSet evm.ChainSet, ethks ETHKeyStore, ...) *runner
- func ParseETHABIArgsString(theABI []byte, isLog bool) (args abi.Arguments, indexedArgs abi.Arguments, _ error)
- func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
- type AddressParam
- type AddressSliceParam
- type AnyTask
- type BaseTask
- func (t *BaseTask) Base() *BaseTask
- func (t BaseTask) DotID() string
- func (t BaseTask) ID() int
- func (t BaseTask) Inputs() []Task
- func (t BaseTask) OutputIndex() int32
- func (t BaseTask) Outputs() []Task
- func (t BaseTask) TaskMaxBackoff() time.Duration
- func (t BaseTask) TaskMinBackoff() time.Duration
- func (t BaseTask) TaskRetries() uint32
- func (t BaseTask) TaskTimeout() (time.Duration, bool)
- type BoolParam
- type BridgeTask
- type BytesParam
- type CBORParseTask
- type Config
- type DecimalParam
- type DecimalSliceParam
- type DivideTask
- type ETHABIDecodeLogTask
- type ETHABIDecodeTask
- type ETHABIEncodeTask
- type ETHABIEncodeTask2
- type ETHCallTask
- type ETHKeyStore
- type ETHTxTask
- type ErrRunPanicked
- type EstimateGasLimitTask
- type FailTask
- type FinalResult
- type GasEstimator
- type GetterFunc
- type Graph
- type GraphNode
- type HTTPTask
- type HashSliceParam
- type JSONParseTask
- type JSONPathParam
- type JSONSerializable
- type Keypath
- type MapParam
- type MaybeBigIntParam
- type MaybeInt32Param
- type MaybeUint64Param
- type MeanTask
- type MedianTask
- type MemoTask
- type MergeTask
- type Method
- type ModeTask
- type MultiplyTask
- type ORM
- type ObjectParam
- type ObjectType
- type PanicTask
- type Pipeline
- type PipelineParamUnmarshaler
- type PossibleErrorResponses
- type Result
- type ResumeRequest
- type Run
- func (r *Run) ByDotID(id string) *TaskRun
- func (r Run) GetID() string
- func (r Run) HasErrors() bool
- func (r Run) HasFatalErrors() bool
- func (r *Run) SetID(value string) error
- func (r *Run) Status() RunStatus
- func (r *Run) StringAllErrors() []*string
- func (r *Run) StringFatalErrors() []*string
- func (r *Run) StringOutputs() ([]*string, error)
- func (Run) TableName() string
- type RunErrors
- type RunInfo
- type RunStatus
- type Runner
- type SliceParam
- type Spec
- type StringParam
- type SumTask
- type Task
- type TaskRun
- type TaskRunResult
- type TaskRunResults
- type TaskType
- type URLParam
- type Uint64Param
- type VRFKeyStore
- type VRFTask
- type VRFTaskV2
- type Vars
Constants ¶
const (
InputTaskKey = "input"
)
Variables ¶
var ( ErrWrongInputCardinality = errors.New("wrong number of task inputs") ErrBadInput = errors.New("bad input for task") ErrInputTaskErrored = errors.New("input task errored") ErrParameterEmpty = errors.New("parameter is empty") ErrTooManyErrors = errors.New("too many errors") ErrTimeout = errors.New("timeout") ErrTaskRunFailed = errors.New("task run failed") ErrCancelled = errors.New("task run cancelled (fail early)") )
var ( // PromPipelineTaskExecutionTime reports how long each pipeline task took to execute // TODO: Make private again after // https://app.clubhouse.io/chainlinklabs/story/6065/hook-keeper-up-to-use-tasks-in-the-pipeline PromPipelineTaskExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_task_execution_time", Help: "How long each pipeline task took to execute", }, []string{"job_id", "job_name", "task_id", "task_type"}, ) PromPipelineRunErrors = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_run_errors", Help: "Number of errors for each pipeline spec", }, []string{"job_id", "job_name"}, ) PromPipelineRunTotalTimeToCompletion = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_run_total_time_to_completion", Help: "How long each pipeline run took to finish (from the moment it was created)", }, []string{"job_id", "job_name"}, ) PromPipelineTasksTotalFinished = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_tasks_total_finished", Help: "The total number of pipeline tasks which have finished", }, []string{"job_id", "job_name", "task_id", "task_type", "status"}, ) )
var ( ErrKeypathNotFound = errors.New("keypath not found") ErrKeypathTooDeep = errors.New("keypath too deep (maximum 2 keys)") ErrVarsRoot = errors.New("cannot get/set the root of a pipeline.Vars") )
var (
ErrNoSuchBridge = errors.New("no such bridge exists")
)
var ErrOverflow = errors.New("overflow")
Functions ¶
func CheckInputs ¶ added in v0.10.8
func NewRunner ¶
func NewRunner(orm ORM, config Config, chainSet evm.ChainSet, ethks ETHKeyStore, vrfks VRFKeyStore, lggr logger.Logger) *runner
func ParseETHABIArgsString ¶ added in v1.1.0
func ResolveParam ¶ added in v0.10.8
func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
Types ¶
type AddressParam ¶ added in v0.10.9
func (*AddressParam) UnmarshalPipelineParam ¶ added in v0.10.9
func (a *AddressParam) UnmarshalPipelineParam(val interface{}) error
type AddressSliceParam ¶ added in v0.10.9
func (*AddressSliceParam) UnmarshalPipelineParam ¶ added in v0.10.9
func (s *AddressSliceParam) UnmarshalPipelineParam(val interface{}) error
type AnyTask ¶ added in v0.10.0
type AnyTask struct {
BaseTask `mapstructure:",squash"`
}
AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.
type BaseTask ¶
type BaseTask struct {
Index int32 `mapstructure:"index" json:"-" `
Timeout time.Duration `mapstructure:"timeout"`
FailEarly bool `mapstructure:"failEarly"`
Retries null.Uint32 `mapstructure:"retries"`
MinBackoff time.Duration `mapstructure:"minBackoff"`
MaxBackoff time.Duration `mapstructure:"maxBackoff"`
// contains filtered or unexported fields
}
func NewBaseTask ¶ added in v0.9.9
func (BaseTask) OutputIndex ¶
func (BaseTask) TaskMaxBackoff ¶ added in v1.0.0
func (BaseTask) TaskMinBackoff ¶ added in v1.0.0
func (BaseTask) TaskRetries ¶ added in v1.0.0
type BoolParam ¶ added in v0.10.8
type BoolParam bool
func (*BoolParam) UnmarshalPipelineParam ¶ added in v0.10.8
type BridgeTask ¶
type BridgeTask struct {
BaseTask `mapstructure:",squash"`
Name string `json:"name"`
RequestData string `json:"requestData"`
IncludeInputAtKey string `json:"includeInputAtKey"`
Async string `json:"async"`
// contains filtered or unexported fields
}
Return types:
string
func (*BridgeTask) Type ¶
func (t *BridgeTask) Type() TaskType
type BytesParam ¶ added in v0.10.8
type BytesParam []byte
func (*BytesParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error
type CBORParseTask ¶ added in v0.10.9
type CBORParseTask struct {
BaseTask `mapstructure:",squash"`
Data string `json:"data"`
Mode string `json:"mode"`
}
Return types:
map[string]interface{} with potential value types:
float64
string
bool
map[string]interface{}
[]interface{}
nil
func (*CBORParseTask) Type ¶ added in v0.10.9
func (t *CBORParseTask) Type() TaskType
type Config ¶
type Config interface {
BridgeResponseURL() *url.URL
DatabaseMaximumTxDuration() time.Duration
DatabaseURL() url.URL
DefaultHTTPLimit() int64
DefaultHTTPTimeout() models.Duration
DefaultMaxHTTPAttempts() uint
DefaultHTTPAllowUnrestrictedNetworkAccess() bool
TriggerFallbackDBPollInterval() time.Duration
JobPipelineMaxRunDuration() time.Duration
JobPipelineReaperInterval() time.Duration
JobPipelineReaperThreshold() time.Duration
}
type DecimalParam ¶ added in v0.10.8
func (DecimalParam) Decimal ¶ added in v0.10.8
func (d DecimalParam) Decimal() decimal.Decimal
func (*DecimalParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error
type DecimalSliceParam ¶ added in v0.10.8
func (*DecimalSliceParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error
type DivideTask ¶ added in v0.10.9
type DivideTask struct {
BaseTask `mapstructure:",squash"`
Input string `json:"input"`
Divisor string `json:"divisor"`
Precision string `json:"precision"`
}
Return types:
*decimal.Decimal
func (*DivideTask) Type ¶ added in v0.10.9
func (t *DivideTask) Type() TaskType
type ETHABIDecodeLogTask ¶ added in v0.10.9
type ETHABIDecodeLogTask struct {
BaseTask `mapstructure:",squash"`
ABI string `json:"abi"`
Data string `json:"data"`
Topics string `json:"topics"`
}
Return types:
map[string]interface{} with any geth/abigen value type
func (*ETHABIDecodeLogTask) Type ¶ added in v0.10.9
func (t *ETHABIDecodeLogTask) Type() TaskType
type ETHABIDecodeTask ¶ added in v0.10.9
type ETHABIDecodeTask struct {
BaseTask `mapstructure:",squash"`
ABI string `json:"abi"`
Data string `json:"data"`
}
Return types:
map[string]interface{} with any geth/abigen value type
func (*ETHABIDecodeTask) Type ¶ added in v0.10.9
func (t *ETHABIDecodeTask) Type() TaskType
type ETHABIEncodeTask ¶ added in v0.10.9
type ETHABIEncodeTask struct {
BaseTask `mapstructure:",squash"`
ABI string `json:"abi"`
Data string `json:"data"`
}
Return types:
[]byte
func (*ETHABIEncodeTask) Type ¶ added in v0.10.9
func (t *ETHABIEncodeTask) Type() TaskType
type ETHABIEncodeTask2 ¶ added in v1.1.0
type ETHABIEncodeTask2 struct {
BaseTask `mapstructure:",squash"`
ABI string `json:"abi"`
Data string `json:"data"`
}
Return types:
[]byte
func (*ETHABIEncodeTask2) Type ¶ added in v1.1.0
func (t *ETHABIEncodeTask2) Type() TaskType
type ETHCallTask ¶ added in v0.10.9
type ETHCallTask struct {
BaseTask `mapstructure:",squash"`
Contract string `json:"contract"`
Data string `json:"data"`
Gas string `json:"gas"`
GasPrice string `json:"gasPrice"`
GasTipCap string `json:"gasTipCap"`
GasFeeCap string `json:"gasFeeCap"`
ExtractRevertReason bool `json:"extractRevertReason"`
EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"`
// contains filtered or unexported fields
}
Return types:
[]byte
func (*ETHCallTask) Type ¶ added in v0.10.9
func (t *ETHCallTask) Type() TaskType
type ETHKeyStore ¶ added in v0.10.11
type ETHTxTask ¶ added in v0.10.9
type ETHTxTask struct {
BaseTask `mapstructure:",squash"`
From string `json:"from"`
To string `json:"to"`
Data string `json:"data"`
GasLimit string `json:"gasLimit"`
TxMeta string `json:"txMeta"`
MinConfirmations string `json:"minConfirmations"`
EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"`
Simulate string `json:"simulate" mapstructure:"simulate"`
// contains filtered or unexported fields
}
Return types:
nil
type ErrRunPanicked ¶ added in v0.10.4
type ErrRunPanicked struct {
// contains filtered or unexported fields
}
When a task panics, we catch the panic and wrap it in an error for reporting to the scheduler.
func (ErrRunPanicked) Error ¶ added in v0.10.9
func (err ErrRunPanicked) Error() string
type EstimateGasLimitTask ¶ added in v1.0.0
type EstimateGasLimitTask struct {
BaseTask `mapstructure:",squash"`
Input string `json:"input"`
From string `json:"from"`
To string `json:"to"`
Multiplier string `json:"multiplier"`
Data string `json:"data"`
EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"`
// contains filtered or unexported fields
}
Return types:
uint64
func (*EstimateGasLimitTask) Type ¶ added in v1.0.0
func (t *EstimateGasLimitTask) Type() TaskType
type FailTask ¶ added in v1.1.0
FailTask is like the Panic task but without all the drama and stack unwinding of a panic
type FinalResult ¶ added in v0.9.10
FinalResult is the result of a Run
func (FinalResult) HasErrors ¶ added in v0.9.10
func (result FinalResult) HasErrors() bool
HasErrors returns true if the final result has any errors
func (FinalResult) HasFatalErrors ¶ added in v1.1.0
func (result FinalResult) HasFatalErrors() bool
HasFatalErrors returns true if the final result has any errors
func (FinalResult) SingularResult ¶ added in v0.9.10
func (result FinalResult) SingularResult() (Result, error)
SingularResult returns a single result if the FinalResult only has one set of outputs/errors
type GasEstimator ¶ added in v1.0.0
type GetterFunc ¶ added in v0.10.8
type GetterFunc func() (interface{}, error)
func From ¶ added in v0.10.8
func From(getters ...interface{}) []GetterFunc
func Input ¶ added in v0.10.8
func Input(inputs []Result, index int) GetterFunc
func Inputs ¶ added in v0.10.8
func Inputs(inputs []Result) GetterFunc
func JSONWithVarExprs ¶ added in v0.10.8
func JSONWithVarExprs(s string, vars Vars, allowErrors bool) GetterFunc
func NonemptyString ¶ added in v0.10.8
func NonemptyString(s string) GetterFunc
func VarExpr ¶ added in v0.10.8
func VarExpr(s string, vars Vars) GetterFunc
type Graph ¶ added in v0.10.9
type Graph struct {
*simple.DirectedGraph
}
tree fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it.
func (*Graph) UnmarshalText ¶ added in v0.10.9
type GraphNode ¶ added in v0.10.9
func (*GraphNode) Attributes ¶ added in v0.10.9
func (*GraphNode) SetAttribute ¶ added in v0.10.9
type HTTPTask ¶
type HTTPTask struct {
BaseTask `mapstructure:",squash"`
Method string
URL string
RequestData string `json:"requestData"`
AllowUnrestrictedNetworkAccess string
// contains filtered or unexported fields
}
Return types:
string
type HashSliceParam ¶ added in v0.10.9
func (*HashSliceParam) UnmarshalPipelineParam ¶ added in v0.10.9
func (s *HashSliceParam) UnmarshalPipelineParam(val interface{}) error
type JSONParseTask ¶
type JSONParseTask struct {
BaseTask `mapstructure:",squash"`
Path string `json:"path"`
Data string `json:"data"`
// Lax when disabled will return an error if the path does not exist
// Lax when enabled will return nil with no error if the path does not exist
Lax string
}
Return types:
float64
string
bool
map[string]interface{}
[]interface{}
nil
func (*JSONParseTask) Type ¶
func (t *JSONParseTask) Type() TaskType
type JSONPathParam ¶ added in v0.10.9
type JSONPathParam []string
func (*JSONPathParam) UnmarshalPipelineParam ¶ added in v0.10.9
func (p *JSONPathParam) UnmarshalPipelineParam(val interface{}) error
type JSONSerializable ¶
type JSONSerializable struct {
Val interface{}
Valid bool
}
func (*JSONSerializable) Empty ¶ added in v0.10.10
func (js *JSONSerializable) Empty() bool
func (JSONSerializable) MarshalJSON ¶
func (js JSONSerializable) MarshalJSON() ([]byte, error)
MarshalJSON implements custom marshaling logic
func (*JSONSerializable) Scan ¶
func (js *JSONSerializable) Scan(value interface{}) error
func (*JSONSerializable) UnmarshalJSON ¶
func (js *JSONSerializable) UnmarshalJSON(bs []byte) error
UnmarshalJSON implements custom unmarshaling logic
type MapParam ¶ added in v0.10.8
type MapParam map[string]interface{}
MapParam accepts maps or JSON-encoded strings
func (*MapParam) UnmarshalPipelineParam ¶ added in v0.10.8
type MaybeBigIntParam ¶ added in v1.1.0
type MaybeBigIntParam struct {
// contains filtered or unexported fields
}
func (MaybeBigIntParam) BigInt ¶ added in v1.1.0
func (p MaybeBigIntParam) BigInt() *big.Int
func (*MaybeBigIntParam) UnmarshalPipelineParam ¶ added in v1.1.0
func (p *MaybeBigIntParam) UnmarshalPipelineParam(val interface{}) error
type MaybeInt32Param ¶ added in v0.10.9
type MaybeInt32Param struct {
// contains filtered or unexported fields
}
func (MaybeInt32Param) Int32 ¶ added in v0.10.9
func (p MaybeInt32Param) Int32() (int32, bool)
func (*MaybeInt32Param) UnmarshalPipelineParam ¶ added in v0.10.9
func (p *MaybeInt32Param) UnmarshalPipelineParam(val interface{}) error
type MaybeUint64Param ¶ added in v0.10.8
type MaybeUint64Param struct {
// contains filtered or unexported fields
}
func (MaybeUint64Param) Uint64 ¶ added in v0.10.8
func (p MaybeUint64Param) Uint64() (uint64, bool)
func (*MaybeUint64Param) UnmarshalPipelineParam ¶ added in v0.10.8
func (p *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error
type MeanTask ¶ added in v0.10.9
type MeanTask struct {
BaseTask `mapstructure:",squash"`
Values string `json:"values"`
AllowedFaults string `json:"allowedFaults"`
Precision string `json:"precision"`
}
Return types:
*decimal.Decimal
type MedianTask ¶
type MedianTask struct {
BaseTask `mapstructure:",squash"`
Values string `json:"values"`
AllowedFaults string `json:"allowedFaults"`
}
Return types:
*decimal.Decimal
func (*MedianTask) Type ¶
func (t *MedianTask) Type() TaskType
type MergeTask ¶ added in v1.1.0
type MergeTask struct {
BaseTask `mapstructure:",squash"`
Left string `json:"left"`
Right string `json:"right"`
}
Return types:
map[string]interface{}
type Method ¶ added in v1.1.0
go-ethereum's abi.Method doesn't implement json.Marshal for Type, but otherwise would have worked fine, in any case we only care about these...
type ModeTask ¶ added in v0.10.9
type ModeTask struct {
BaseTask `mapstructure:",squash"`
Values string `json:"values"`
AllowedFaults string `json:"allowedFaults"`
}
Return types:
map[string]interface{}{
"results": []interface{} containing any other type other pipeline tasks can return
"occurrences": (int64)
}
type MultiplyTask ¶
type MultiplyTask struct {
BaseTask `mapstructure:",squash"`
Input string `json:"input"`
Times string `json:"times"`
}
Return types:
*decimal.Decimal
func (*MultiplyTask) Type ¶
func (t *MultiplyTask) Type() TaskType
type ORM ¶
type ORM interface {
CreateSpec(pipeline Pipeline, maxTaskTimeout models.Interval, qopts ...pg.QOpt) (int32, error)
CreateRun(run *Run, qopts ...pg.QOpt) (err error)
InsertRun(run *Run, qopts ...pg.QOpt) error
DeleteRun(id int64) error
StoreRun(run *Run, qopts ...pg.QOpt) (restart bool, err error)
UpdateTaskRunResult(taskID uuid.UUID, result Result) (run Run, start bool, err error)
InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) (err error)
DeleteRunsOlderThan(context.Context, time.Duration) error
FindRun(id int64) (Run, error)
GetAllRuns() ([]Run, error)
GetUnfinishedRuns(context.Context, time.Time, func(run Run) error) error
GetQ() pg.Q
}
type ObjectParam ¶ added in v1.1.0
type ObjectParam struct {
Type ObjectType
BoolValue BoolParam
DecimalValue DecimalParam
StringValue StringParam
SliceValue SliceParam
MapValue MapParam
}
ObjectParam represents a kind of any type that could be used by the memo task
func MustNewObjectParam ¶ added in v1.1.0
func MustNewObjectParam(val interface{}) *ObjectParam
func (ObjectParam) Marshal ¶ added in v1.1.0
func (o ObjectParam) Marshal() (string, error)
func (ObjectParam) MarshalJSON ¶ added in v1.1.0
func (o ObjectParam) MarshalJSON() ([]byte, error)
func (ObjectParam) String ¶ added in v1.1.0
func (o ObjectParam) String() string
func (*ObjectParam) UnmarshalPipelineParam ¶ added in v1.1.0
func (o *ObjectParam) UnmarshalPipelineParam(val interface{}) error
type ObjectType ¶ added in v1.1.0
type ObjectType int
const ( NilType ObjectType = iota BoolType DecimalType StringType SliceType MapType )
type Pipeline ¶ added in v0.10.9
func (*Pipeline) MinTimeout ¶ added in v0.10.9
func (*Pipeline) RequiresPreInsert ¶ added in v1.0.0
func (*Pipeline) UnmarshalText ¶ added in v0.10.9
type PipelineParamUnmarshaler ¶ added in v0.10.8
type PipelineParamUnmarshaler interface {
UnmarshalPipelineParam(val interface{}) error
}
type PossibleErrorResponses ¶
type Result ¶
type Result struct {
Value interface{}
Error error
}
Result is the result of a TaskRun
func (Result) ErrorDB ¶ added in v0.9.10
ErrorDB dumps a single result error for a pipeline_task_run
func (Result) OutputDB ¶ added in v0.9.10
func (result Result) OutputDB() JSONSerializable
OutputDB dumps a single result output for a pipeline_run or pipeline_task_run
type ResumeRequest ¶ added in v1.1.0
type ResumeRequest struct {
Error null.String `json:"error"`
Value json.RawMessage `json:"value"`
}
func (ResumeRequest) ToResult ¶ added in v1.1.0
func (rr ResumeRequest) ToResult() (Result, error)
type Run ¶
type Run struct {
ID int64 `json:"-"`
PipelineSpecID int32 `json:"-"`
PipelineSpec Spec `json:"pipelineSpec"`
Meta JSONSerializable `json:"meta"`
// The errors are only ever strings
// DB example: [null, null, "my error"]
AllErrors RunErrors `json:"all_errors"`
FatalErrors RunErrors `json:"fatal_errors"`
Inputs JSONSerializable `json:"inputs"`
// Its expected that Output.Val is of type []interface{}.
// DB example: [1234, {"a": 10}, null]
Outputs JSONSerializable `json:"outputs"`
CreatedAt time.Time `json:"createdAt"`
FinishedAt null.Time `json:"finishedAt"`
PipelineTaskRuns []TaskRun `json:"taskRuns"`
State RunStatus `json:"state"`
Pending bool
FailEarly bool
}
func (Run) HasFatalErrors ¶ added in v1.1.0
func (*Run) StringAllErrors ¶ added in v1.1.0
func (*Run) StringFatalErrors ¶ added in v1.1.0
func (*Run) StringOutputs ¶ added in v1.1.0
type RunStatus ¶ added in v0.10.3
type RunStatus string
RunStatus represents the status of a run
const ( // RunStatusUnknown is the when the run status cannot be determined. RunStatusUnknown RunStatus = "unknown" // RunStatusRunning is used for when a run is actively being executed. RunStatusRunning RunStatus = "running" // RunStatusSuspended is used when a run is paused and awaiting further results. RunStatusSuspended RunStatus = "suspended" // RunStatusErrored is used for when a run has errored and will not complete. RunStatusErrored RunStatus = "errored" // RunStatusCompleted is used for when a run has successfully completed execution. RunStatusCompleted RunStatus = "completed" )
func (RunStatus) Completed ¶ added in v0.10.3
Completed returns true if the status is RunStatusCompleted.
type Runner ¶
type Runner interface {
service.Service
// Run is a blocking call that will execute the run until no further progress can be made.
// If `incomplete` is true, the run is only partially complete and is suspended, awaiting to be resumed when more data comes in.
// Note that `saveSuccessfulTaskRuns` value is ignored if the run contains async tasks.
Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx pg.Queryer) error) (incomplete bool, err error)
ResumeRun(taskID uuid.UUID, value interface{}, err error) error
// We expect spec.JobID and spec.JobName to be set for logging/prometheus.
// ExecuteRun executes a new run in-memory according to a spec and returns the results.
ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run Run, trrs TaskRunResults, err error)
// InsertFinishedRun saves the run results in the database.
InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error
// ExecuteAndInsertFinishedRun executes a new run in-memory according to a spec, persists and saves the results.
// It is a combination of ExecuteRun and InsertFinishedRun.
// Note that the spec MUST have a DOT graph for this to work.
ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, finalResult FinalResult, err error)
OnRunFinished(func(*Run))
}
type SliceParam ¶ added in v0.10.8
type SliceParam []interface{}
func (SliceParam) FilterErrors ¶ added in v0.10.8
func (s SliceParam) FilterErrors() (SliceParam, int)
func (*SliceParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error
type Spec ¶
type StringParam ¶ added in v0.10.8
type StringParam string
func (*StringParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *StringParam) UnmarshalPipelineParam(val interface{}) error
type SumTask ¶ added in v0.10.9
type SumTask struct {
BaseTask `mapstructure:",squash"`
Values string `json:"values"`
AllowedFaults string `json:"allowedFaults"`
}
Return types:
*decimal.Decimal
type Task ¶
type Task interface {
Type() TaskType
ID() int
DotID() string
Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo)
Base() *BaseTask
Outputs() []Task
Inputs() []Task
OutputIndex() int32
TaskTimeout() (time.Duration, bool)
TaskRetries() uint32
TaskMinBackoff() time.Duration
TaskMaxBackoff() time.Duration
}
type TaskRun ¶
type TaskRun struct {
ID uuid.UUID `json:"id"`
Type TaskType `json:"type"`
PipelineRun Run `json:"-"`
PipelineRunID int64 `json:"-"`
Output JSONSerializable `json:"output"`
Error null.String `json:"error"`
CreatedAt time.Time `json:"createdAt"`
FinishedAt null.Time `json:"finishedAt"`
Index int32 `json:"index"`
DotID string `json:"dotId"`
// contains filtered or unexported fields
}
type TaskRunResult ¶ added in v0.9.10
type TaskRunResult struct {
ID uuid.UUID
Task Task
TaskRun TaskRun
Result Result
Attempts uint
CreatedAt time.Time
FinishedAt null.Time
// contains filtered or unexported fields
}
TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero
func (*TaskRunResult) IsPending ¶ added in v0.10.10
func (result *TaskRunResult) IsPending() bool
func (*TaskRunResult) IsTerminal ¶ added in v0.9.10
func (result *TaskRunResult) IsTerminal() bool
type TaskRunResults ¶ added in v0.9.10
type TaskRunResults []TaskRunResult
TaskRunResults represents a collection of results for all task runs for one pipeline run
func (TaskRunResults) FinalResult ¶ added in v0.9.10
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult
FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task
type TaskType ¶
type TaskType string
const ( TaskTypeHTTP TaskType = "http" TaskTypeBridge TaskType = "bridge" TaskTypeMean TaskType = "mean" TaskTypeMedian TaskType = "median" TaskTypeMode TaskType = "mode" TaskTypeSum TaskType = "sum" TaskTypeMultiply TaskType = "multiply" TaskTypeDivide TaskType = "divide" TaskTypeJSONParse TaskType = "jsonparse" TaskTypeCBORParse TaskType = "cborparse" TaskTypeAny TaskType = "any" TaskTypeVRF TaskType = "vrf" TaskTypeVRFV2 TaskType = "vrfv2" TaskTypeEstimateGasLimit TaskType = "estimategaslimit" TaskTypeETHCall TaskType = "ethcall" TaskTypeETHTx TaskType = "ethtx" TaskTypeETHABIEncode TaskType = "ethabiencode" TaskTypeETHABIEncode2 TaskType = "ethabiencode2" TaskTypeETHABIDecode TaskType = "ethabidecode" TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog" TaskTypeMerge TaskType = "merge" // Testing only. TaskTypePanic TaskType = "panic" TaskTypeMemo TaskType = "memo" TaskTypeFail TaskType = "fail" )
type URLParam ¶ added in v0.10.8
func (*URLParam) UnmarshalPipelineParam ¶ added in v0.10.8
type Uint64Param ¶ added in v0.10.8
type Uint64Param uint64
func (*Uint64Param) UnmarshalPipelineParam ¶ added in v0.10.8
func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error
type VRFKeyStore ¶ added in v0.10.11
type VRFTask ¶ added in v0.10.8
type VRFTask struct {
BaseTask `mapstructure:",squash"`
PublicKey string `json:"publicKey"`
RequestBlockHash string `json:"requestBlockHash"`
RequestBlockNumber string `json:"requestBlockNumber"`
Topics string `json:"topics"`
// contains filtered or unexported fields
}
type VRFTaskV2 ¶ added in v1.0.0
type VRFTaskV2 struct {
BaseTask `mapstructure:",squash"`
PublicKey string `json:"publicKey"`
RequestBlockHash string `json:"requestBlockHash"`
RequestBlockNumber string `json:"requestBlockNumber"`
Topics string `json:"topics"`
// contains filtered or unexported fields
}
type Vars ¶ added in v0.10.8
type Vars struct {
// contains filtered or unexported fields
}
func NewVarsFrom ¶ added in v0.10.8
Source Files
¶
- common.go
- common_eth.go
- common_http.go
- graph.go
- models.go
- orm.go
- runner.go
- scheduler.go
- task.any.go
- task.base.go
- task.bridge.go
- task.cborparse.go
- task.divide.go
- task.estimategas.go
- task.eth_abi_decode.go
- task.eth_abi_decode_log.go
- task.eth_abi_encode.go
- task.eth_abi_encode_2.go
- task.eth_call.go
- task.eth_tx.go
- task.fail.go
- task.http.go
- task.jsonparse.go
- task.mean.go
- task.median.go
- task.memo.go
- task.merge.go
- task.mode.go
- task.multiply.go
- task.panic.go
- task.sum.go
- task.vrf.go
- task.vrfv2.go
- task_object_params.go
- task_params.go
- variables.go