pipeline

package
v1.17.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2025 License: Apache-2.0 Imports: 55 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDone = errors.New("done")
View Source
var ErrShuttingDown = errors.New("endpoint is shutting down, please reconnect")
View Source
var PrintStack = os.Getenv("SUBSTREAMS_PRINT_STACK") == "true" || os.Getenv("SUBSTREAMS_PRINT_STACK") == "1"

Functions

func BlockToClock added in v1.8.1

func BlockToClock(block *pbbstream.Block) *pbsubstreams.Clock

func BuildRequestDetails added in v0.1.0

func BuildRequestDetails(
	ctx context.Context,
	request *pbsubstreamsrpc.Request,
	getRecentFinalBlock getBlockFunc,
	resolveCursor CursorResolver,
	getHeadBlock getBlockFunc,
	segmentSize uint64,
) (req *reqctx.RequestDetails, undoSignal *pbsubstreamsrpc.BlockUndoSignal, err error)

func BuildRequestDetailsFromSubrequest added in v1.0.2

func BuildRequestDetailsFromSubrequest(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (req *reqctx.RequestDetails)

func NewStoreBoundary added in v0.0.21

func NewStoreBoundary(
	interval uint64,
	requestStartBlockNum uint64,
	requestStopBlock uint64,
) *storeBoundary

Types

type CursorResolver added in v1.0.2

type CursorResolver func(context.Context, *bstream.Cursor) (reorgJunctionBlock, currentHead bstream.BlockRef, err error)

func NewCursorResolver added in v1.0.2

func NewCursorResolver(hub *hub.ForkableHub, mergedBlocksStore, forkedBlocksStore dstore.Store) CursorResolver

type ForkHandler added in v0.0.20

type ForkHandler struct {
	// contains filtered or unexported fields
}

TODO(abourget): The scope of this object and the Engine

are not pretty similar, to keep track of certain pieces
of info that are reversible, and handle the back and forth
between undos and redos.
Perhaps what we could have here, is have those undo handlers
live on the Pipeline (where it makes sense)
and have some nested structs handle

func NewForkHandler added in v0.0.21

func NewForkHandler() *ForkHandler

type Option

type Option func(p *Pipeline)

func WithFinalBlocksOnly added in v1.1.1

func WithFinalBlocksOnly() Option

func WithHeadBlockGetter added in v1.14.3

func WithHeadBlockGetter(getter func() (uint64, error)) Option

func WithHighestStage added in v1.1.9

func WithHighestStage(stage uint32) Option

func WithPendingUndoMessage added in v1.1.1

func WithPendingUndoMessage(msg *pbsubstreamsrpc.Response) Option

WithPendingUndoMessage allows sending a message right before we send the first 'BlockScopedData'

func WithPostBlockHook

func WithPostBlockHook(f substreams.BlockHook) Option

func WithPostJobHook

func WithPostJobHook(f substreams.PostJobHook) Option

func WithPreBlockHook

func WithPreBlockHook(f substreams.BlockHook) Option

type Pipeline

type Pipeline struct {

	// StagedModuleExecutors represents all the modules within a stage that should be executed. The
	// first level of the 2D list represents layer within a stage to execute sequentially.
	// The second level contains modules to execute within a layer, those can be executed concurrently.
	StagedModuleExecutors [][]exec.ModuleExecutor
	// contains filtered or unexported fields
}

func New

func New(
	ctx context.Context,
	isTier1 bool,
	execGraph *exec.Graph,
	stores *Stores,
	indices map[string]map[string]*roaring64.Bitmap,
	execoutStorage *execout.Configs,
	wasmRuntime *wasm.Registry,
	execOutputCache *cache.Engine,
	stateBundleSize uint64,
	workerPoolFactory work.WorkerPoolFactory,
	respFunc substreams.ResponseFunc,
	executionTimeout time.Duration,
	checkPendingShutdown func() bool,
	foundationalEndpoints map[string]string,
	opts ...Option,
) *Pipeline

func (*Pipeline) BuildModuleExecutors added in v1.6.0

func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error

BuildModuleExecutors builds the ModuleExecutors, and the loadedModules.

func (*Pipeline) GetStoreMap added in v0.1.0

func (p *Pipeline) GetStoreMap() store.Map

func (*Pipeline) Init added in v0.0.14

func (p *Pipeline) Init(ctx context.Context) (err error)

func (*Pipeline) InitTier1StoresAndBackprocess added in v1.1.9

func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan, noopMode bool) (bool, error)

func (*Pipeline) InitTier2Stores added in v1.1.9

func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error)

func (*Pipeline) LastCursor added in v1.16.5

func (p *Pipeline) LastCursor() *bstream.Cursor

func (*Pipeline) OnStreamTerminated added in v0.1.0

func (p *Pipeline) OnStreamTerminated(ctx context.Context, err error) error

OnStreamTerminated performs flush of store and setting trailers when the stream terminated gracefully from our point of view. If the stream terminated gracefully, we return `nil` otherwise, the original is returned.

func (*Pipeline) ProcessBlock added in v0.0.14

func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err error)

func (*Pipeline) ProcessFromExecOutput added in v1.4.0

func (p *Pipeline) ProcessFromExecOutput(
	ctx context.Context,
	clock *pbsubstreams.Clock,
	cursor *bstream.Cursor,
) (err error)

type Stores added in v0.1.0

type Stores struct {
	StoreMap store.Map
	// contains filtered or unexported fields
}

func NewStores added in v0.1.0

func NewStores(ctx context.Context, storeConfigs store.ConfigMap, storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum uint64, isTier2Request bool, storesToWrite map[string]struct{}) *Stores

func (*Stores) SetStoreMap added in v0.1.0

func (s *Stores) SetStoreMap(storeMap store.Map)

type UndoHandler added in v0.0.21

type UndoHandler func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL