orchestrator

package
v0.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var PartialChunksDone = errors.New("partial chunks done")
View Source
var SkipRange = errors.New("skip range")

Functions

This section is empty.

Types

type Backprocessor added in v0.0.21

type Backprocessor struct {
	// contains filtered or unexported fields
}

func BuildBackprocessor added in v0.0.21

func BuildBackprocessor(
	ctx context.Context,
	runtimeConfig config.RuntimeConfig,
	upToBlock uint64,
	graph *manifest.ModuleGraph,
	respFunc func(resp *pbsubstreams.Response) error,
	storeConfigs store.ConfigMap,
	upstreamRequestModules *pbsubstreams.Modules,
) (*Backprocessor, error)

func (*Backprocessor) Run added in v0.0.21

func (b *Backprocessor) Run(ctx context.Context) (store.Map, error)

TODO(abourget): WARN: this function should NOT GROW in functionality, or abstraction levels.

type MultiSquasher added in v0.0.21

type MultiSquasher struct {
	// contains filtered or unexported fields
}

MultiSquasher produces _complete_ stores, by merging backing partial stores.

func NewMultiSquasher added in v0.0.21

func NewMultiSquasher(
	ctx context.Context,
	runtimeConfig config.RuntimeConfig,
	modulesStorageStateMap work.ModuleStorageStateMap,
	storeConfigs store.ConfigMap,
	upToBlock uint64,
	onStoreCompletedUntilBlock func(storeName string, blockNum uint64),
) (*MultiSquasher, error)

NewMultiSquasher receives stores, initializes them and fetches them from the existing storage. It prepares itself to receive Squash() requests that should correspond to what is missing for those stores to reach `targetExclusiveEndBlock`. This is managed externally by the Scheduler/Strategy. Eventually, ideally, all components are synchronizes around the actual data: the state of storages present, the requests needed to fill in those stores up to the target block, etc..

func (*MultiSquasher) Launch added in v0.0.21

func (s *MultiSquasher) Launch(ctx context.Context)

func (*MultiSquasher) Squash added in v0.0.21

func (s *MultiSquasher) Squash(moduleName string, partialsRanges block.Ranges) error

func (*MultiSquasher) Wait added in v0.0.21

func (s *MultiSquasher) Wait(ctx context.Context) (out store.Map, err error)

type Scheduler

type Scheduler struct {
	OnStoreJobTerminated func(moduleName string, partialsWritten block.Ranges) error
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(workPlan *work.Plan, respFunc substreams.ResponseFunc, upstreamRequestModules *pbsubstreams.Modules) *Scheduler

func (*Scheduler) OnStoreCompletedUntilBlock added in v0.0.21

func (s *Scheduler) OnStoreCompletedUntilBlock(storeName string, blockNum uint64)

OnStoreCompletedUntilBlock is called to indicate that the given storeName has snapshots at the `storeSaveIntervals` up to `blockNum` here.

This should unlock all jobs that were dependent

func (*Scheduler) Schedule added in v0.0.21

func (s *Scheduler) Schedule(ctx context.Context, pool work.WorkerPool) (err error)

type StoreSquasher added in v0.0.14

type StoreSquasher struct {
	// contains filtered or unexported fields
}

func NewStoreSquasher added in v0.0.14

func NewStoreSquasher(
	initialStore *store.FullKV,
	targetExclusiveBlock,
	nextExpectedStartBlock uint64,
	storeSaveInterval uint64,
	onStoreCompletedUntilBlock func(storeName string, blockNum uint64),
) *StoreSquasher

func (*StoreSquasher) IsEmpty added in v0.0.14

func (s *StoreSquasher) IsEmpty() bool

func (*StoreSquasher) String added in v0.0.14

func (s *StoreSquasher) String() string

func (*StoreSquasher) WaitForCompletion added in v0.0.21

func (s *StoreSquasher) WaitForCompletion(ctx context.Context) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL