Documentation
¶
Index ¶
- Variables
- func BlockToClock(block *pbbstream.Block) *pbsubstreams.Clock
- func BuildRequestDetails(ctx context.Context, request *pbsubstreamsrpc.Request, ...) (req *reqctx.RequestDetails, undoSignal *pbsubstreamsrpc.BlockUndoSignal, ...)
- func BuildRequestDetailsFromSubrequest(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (req *reqctx.RequestDetails)
- func NewStoreBoundary(interval uint64, requestStartBlockNum uint64, requestStopBlock uint64) *storeBoundary
- type CursorResolver
- type ForkHandler
- type Option
- func WithFinalBlocksOnly() Option
- func WithHeadBlockGetter(getter func() (uint64, error)) Option
- func WithHighestStage(stage uint32) Option
- func WithPendingUndoMessage(msg *pbsubstreamsrpc.Response) Option
- func WithPostBlockHook(f substreams.BlockHook) Option
- func WithPostJobHook(f substreams.PostJobHook) Option
- func WithPreBlockHook(f substreams.BlockHook) Option
- type Pipeline
- func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error
- func (p *Pipeline) GetStoreMap() store.Map
- func (p *Pipeline) Init(ctx context.Context) (err error)
- func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan, noopMode bool) (bool, error)
- func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error)
- func (p *Pipeline) LastCursor() *bstream.Cursor
- func (p *Pipeline) OnStreamTerminated(ctx context.Context, err error) error
- func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err error)
- func (p *Pipeline) ProcessFromExecOutput(ctx context.Context, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (err error)
- type Stores
- type UndoHandler
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrDone = errors.New("done")
View Source
var ErrShuttingDown = errors.New("endpoint is shutting down, please reconnect")
View Source
var PrintStack = os.Getenv("SUBSTREAMS_PRINT_STACK") == "true" || os.Getenv("SUBSTREAMS_PRINT_STACK") == "1"
Functions ¶
func BlockToClock ¶ added in v1.8.1
func BlockToClock(block *pbbstream.Block) *pbsubstreams.Clock
func BuildRequestDetails ¶ added in v0.1.0
func BuildRequestDetails( ctx context.Context, request *pbsubstreamsrpc.Request, getRecentFinalBlock getBlockFunc, resolveCursor CursorResolver, getHeadBlock getBlockFunc, segmentSize uint64, ) (req *reqctx.RequestDetails, undoSignal *pbsubstreamsrpc.BlockUndoSignal, err error)
func BuildRequestDetailsFromSubrequest ¶ added in v1.0.2
func BuildRequestDetailsFromSubrequest(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (req *reqctx.RequestDetails)
func NewStoreBoundary ¶ added in v0.0.21
Types ¶
type CursorResolver ¶ added in v1.0.2
type CursorResolver func(context.Context, *bstream.Cursor) (reorgJunctionBlock, currentHead bstream.BlockRef, err error)
func NewCursorResolver ¶ added in v1.0.2
func NewCursorResolver(hub *hub.ForkableHub, mergedBlocksStore, forkedBlocksStore dstore.Store) CursorResolver
type ForkHandler ¶ added in v0.0.20
type ForkHandler struct {
// contains filtered or unexported fields
}
TODO(abourget): The scope of this object and the Engine
are not pretty similar, to keep track of certain pieces of info that are reversible, and handle the back and forth between undos and redos. Perhaps what we could have here, is have those undo handlers live on the Pipeline (where it makes sense) and have some nested structs handle
func NewForkHandler ¶ added in v0.0.21
func NewForkHandler() *ForkHandler
type Option ¶
type Option func(p *Pipeline)
func WithFinalBlocksOnly ¶ added in v1.1.1
func WithFinalBlocksOnly() Option
func WithHeadBlockGetter ¶ added in v1.14.3
func WithHighestStage ¶ added in v1.1.9
func WithPendingUndoMessage ¶ added in v1.1.1
func WithPendingUndoMessage(msg *pbsubstreamsrpc.Response) Option
WithPendingUndoMessage allows sending a message right before we send the first 'BlockScopedData'
func WithPostBlockHook ¶
func WithPostBlockHook(f substreams.BlockHook) Option
func WithPostJobHook ¶
func WithPostJobHook(f substreams.PostJobHook) Option
func WithPreBlockHook ¶
func WithPreBlockHook(f substreams.BlockHook) Option
type Pipeline ¶
type Pipeline struct {
// StagedModuleExecutors represents all the modules within a stage that should be executed. The
// first level of the 2D list represents layer within a stage to execute sequentially.
// The second level contains modules to execute within a layer, those can be executed concurrently.
StagedModuleExecutors [][]exec.ModuleExecutor
// contains filtered or unexported fields
}
func New ¶
func New( ctx context.Context, isTier1 bool, execGraph *exec.Graph, stores *Stores, indices map[string]map[string]*roaring64.Bitmap, execoutStorage *execout.Configs, wasmRuntime *wasm.Registry, execOutputCache *cache.Engine, stateBundleSize uint64, workerPoolFactory work.WorkerPoolFactory, respFunc substreams.ResponseFunc, executionTimeout time.Duration, checkPendingShutdown func() bool, foundationalEndpoints map[string]string, opts ...Option, ) *Pipeline
func (*Pipeline) BuildModuleExecutors ¶ added in v1.6.0
BuildModuleExecutors builds the ModuleExecutors, and the loadedModules.
func (*Pipeline) GetStoreMap ¶ added in v0.1.0
func (*Pipeline) InitTier1StoresAndBackprocess ¶ added in v1.1.9
func (*Pipeline) InitTier2Stores ¶ added in v1.1.9
func (*Pipeline) LastCursor ¶ added in v1.16.5
func (*Pipeline) OnStreamTerminated ¶ added in v0.1.0
OnStreamTerminated performs flush of store and setting trailers when the stream terminated gracefully from our point of view. If the stream terminated gracefully, we return `nil` otherwise, the original is returned.
func (*Pipeline) ProcessBlock ¶ added in v0.0.14
func (*Pipeline) ProcessFromExecOutput ¶ added in v1.4.0
type Stores ¶ added in v0.1.0
func (*Stores) SetStoreMap ¶ added in v0.1.0
type UndoHandler ¶ added in v0.0.21
type UndoHandler func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.