Documentation
¶
Index ¶
- Variables
- func NewAsyncStageWithTasks(name StageName, tasks ...Task) *stage
- func NewCustomStage(name StageName, runner stageRunner) *stage
- func NewStageWithTasks(name StageName, tasks ...Task) *stage
- type CustomPipeline
- type DefaultPipeline
- type HeightRange
- type Logger
- type Options
- type Payload
- type PayloadFactory
- type Pipeline
- type Sink
- type Source
- type Stage
- type StageName
- type StageRunnerFunc
- type Stat
- type StatsRecorder
- type Task
- type TaskName
- type TaskValidator
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidLatestHeight is returned when the latest height is a negative number ErrInvalidLatestHeight = errors.New("lastest height is invalid") // ErrInvalidInitialHeight is returned when the initial height is a negative number ErrInvalidInitialHeight = errors.New("initial height is invalid") // ErrInvalidBatchSize is returned when the batch size is a negative number ErrInvalidBatchSize = errors.New("batch size is invalid") // ErrNothingToProcess is returned when there are no heights to process ErrNothingToProcess = errors.New("nothing to process") )
var ( ErrMissingStages = errors.New("provide stages to run concurrently") ErrMissingStage = errors.New("no stage to run") )
Functions ¶
func NewAsyncStageWithTasks ¶ added in v0.1.8
NewAsyncStageWithTasks creates a stage with tasks that will run concurrently
func NewCustomStage ¶ added in v0.1.7
func NewCustomStage(name StageName, runner stageRunner) *stage
NewCustomStage creates a stage with custom stagerunner
func NewStageWithTasks ¶ added in v0.1.8
NewStageWithTasks creates a stage with tasks that will run one by one
Types ¶
type CustomPipeline ¶ added in v0.1.7
type CustomPipeline interface {
Pipeline
AddStage(stage *stage)
AddConcurrentStages(stages ...*stage)
}
CustomPipeline is implemented by types that want to create a pipeline by adding their own stages
func NewCustom ¶ added in v0.1.7
func NewCustom(payloadFactor PayloadFactory) CustomPipeline
NewCustom creates a new pipeline that satisfies CustomPipeline
type DefaultPipeline ¶ added in v0.1.7
type DefaultPipeline interface {
Pipeline
SetTasks(stageName StageName, tasks ...Task)
SetAsyncTasks(stageName StageName, tasks ...Task)
SetCustomStage(stageName StageName, stageRunnerFunc stageRunner)
}
DefaultPipeline is implemented by types that only want to configure existing stages in a pipeline
func NewDefault ¶ added in v0.1.7
func NewDefault(payloadFactor PayloadFactory) DefaultPipeline
NewDefault creates a new DefaultPipeline with all default stages set in default run order
type HeightRange ¶ added in v0.4.0
type HeightRange struct {
// The most recent height
LatestHeight int64
// The last processed height
LastHeight int64
// The starting height when no height has been processed yet
InitialHeight int64
// The number of heights processed in one run
BatchSize int64
}
HeightRange represents a range of heights to be processed
func (*HeightRange) EndHeight ¶ added in v0.4.0
func (hr *HeightRange) EndHeight() int64
EndHeight calculates the last height to process
func (*HeightRange) Length ¶ added in v0.4.0
func (hr *HeightRange) Length() int64
Length calculates the number of heights to process
func (*HeightRange) StartHeight ¶ added in v0.4.0
func (hr *HeightRange) StartHeight() int64
StartHeight calculates the first height to process
func (*HeightRange) Validate ¶ added in v0.4.0
func (hr *HeightRange) Validate(checkLength bool) error
Validate checks if the height range is valid
type Logger ¶
type Logger interface {
// Info logs info message
Info(string)
// Debug logs debug message
Debug(string)
}
Logger is implemented by types that want to hook up to logging mechanism in engine
type Payload ¶
type Payload interface {
// MarkAsProcessed is invoked by the pipeline when the payloadMock
// reaches the end of execution for current height
MarkAsProcessed()
}
Payload is implemented by values that can be sent through a pipeline.
type PayloadFactory ¶
PayloadFactory is implemented by objects which know how to create payloadMock for every height
type Pipeline ¶
type Pipeline interface {
SetLogger(l Logger)
AddStageBefore(existingStageName StageName, stage *stage)
AddStageAfter(existingStageName StageName, stage *stage)
RetryStage(existingStageName StageName, isTransient func(error) bool, maxRetries int)
Start(ctx context.Context, source Source, sink Sink, options *Options) error
Run(ctx context.Context, height int64, options *Options) (Payload, error)
}
type Source ¶
type Source interface {
// Next gets next height
Next(context.Context, Payload) bool
// Current returns current height
Current() int64
// Err return error if any
Err() error
// Skip return bool to skip stage
Skip(StageName) bool
}
Source is executed before processing of individual heights. It is responsible for getting start and end height.
type StageName ¶
type StageName string
const ( // Context key used by StatsRecorder CtxStats = "stats" // Setup stage (Chore): performs setup tasks StageSetup StageName = "stage_setup" // Fetcher stage (Syncing): fetches data for indexing StageFetcher StageName = "stage_fetcher" // Parser stage (Syncing): parses and normalizes fetched data to a single structure StageParser StageName = "stage_parser" // Validator stage (Syncing): validates parsed data StageValidator StageName = "stage_validator" // Syncer stage (Syncing): saves data to datastore StageSyncer StageName = "stage_syncer" // Sequencer stage (Indexing): Creates sequences from synced data (syncable) StageSequencer StageName = "stage_sequencer" // Aggregator stage (Indexing): Creates aggregates from synced data (syncable) StageAggregator StageName = "stage_aggregator" // StagePersistor stage (Indexing): Persists data to datastore StagePersistor StageName = "stage_persistor" // Cleanup stage (Chore): Cleans up after execution StageCleanup StageName = "stage_cleanup" )
type StageRunnerFunc ¶
type StageRunnerFunc func(context.Context, Payload, TaskValidator) error
StageRunnerFunc is an adapter to allow the use of plain functions as stageRunner
func (StageRunnerFunc) Run ¶
func (srf StageRunnerFunc) Run(ctx context.Context, p Payload, f TaskValidator) error
Run calls f(ctx, p, f).
type Stat ¶
func (*Stat) SetCompleted ¶
SetComplete completes stat duration
type StatsRecorder ¶
type StatsRecorder struct {
Stat
}
func NewStatsRecorder ¶
func NewStatsRecorder() *StatsRecorder
StateRecorder is responsible for recording statistics during pipeline execution TODO: Add stats for every stage and every task
type Task ¶
type Task interface {
//Run Task
Run(context.Context, Payload) error
// GetName gets name of task
GetName() string
}
Task is implemented by types that want to be executed inside of a stage
type TaskValidator ¶
TaskValidator is a type for validating task by provided task name
