orchestrator

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2022 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StartQueue

func StartQueue(ctx context.Context, in <-chan *QueueItem, out chan<- *QueueItem)

Types

type BlockWaiter

type BlockWaiter struct {
	// contains filtered or unexported fields
}

func NewWaiter

func NewWaiter(blockNum uint64, stores ...*pbsubstreams.Module) *BlockWaiter

func (*BlockWaiter) Order

func (w *BlockWaiter) Order() int

func (*BlockWaiter) Signal

func (w *BlockWaiter) Signal(storeName string, blockNum uint64)

func (*BlockWaiter) String

func (w *BlockWaiter) String() string

func (*BlockWaiter) Wait

func (w *BlockWaiter) Wait(ctx context.Context) <-chan interface{}

type Job added in v0.0.13

type Job struct {
	// contains filtered or unexported fields
}

func (*Job) MarshalLogObject added in v0.0.13

func (j *Job) MarshalLogObject(enc zapcore.ObjectEncoder) error

type Notifier

type Notifier interface {
	Notify(builder string, blockNum uint64)
}

type OrderedStrategy

type OrderedStrategy struct {
	// contains filtered or unexported fields
}

func NewOrderedStrategy

func NewOrderedStrategy(
	ctx context.Context,
	splitWorks SplitWorkModules,
	request *pbsubstreams.Request,
	stores map[string]*state.Store,
	graph *manifest.ModuleGraph,
	pool *RequestPool,
) (*OrderedStrategy, error)

type PriorityQueue

type PriorityQueue []*QueueItem

A PriorityQueue implements heap.Interface and holds Items.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type QueueItem

type QueueItem struct {
	Priority int // The Priority of the item in the queue.
	// contains filtered or unexported fields
}

type RequestPool

type RequestPool struct {
	// contains filtered or unexported fields
}

func NewRequestPool

func NewRequestPool() *RequestPool

func (*RequestPool) Add

func (p *RequestPool) Add(ctx context.Context, reverseIdx int, job *Job, waiter Waiter) error

func (*RequestPool) Count

func (p *RequestPool) Count() int

func (*RequestPool) Notify

func (p *RequestPool) Notify(builder string, blockNum uint64)

func (*RequestPool) Start

func (p *RequestPool) Start(ctx context.Context)

func (*RequestPool) State

func (p *RequestPool) State() string

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(ctx context.Context, strategy *OrderedStrategy, squasher *Squasher, workerPool *WorkerPool, respFunc substreams.ResponseFunc, blockRangeSizeSubRequests int) (*Scheduler, error)

func (*Scheduler) Callback

func (s *Scheduler) Callback(ctx context.Context, job *Job) error

func (*Scheduler) Launch

func (s *Scheduler) Launch(ctx context.Context, result chan error)

func (*Scheduler) Next

func (s *Scheduler) Next() *Job

type SplitWork added in v0.0.13

type SplitWork struct {
	// contains filtered or unexported fields
}

FIXME(abourget): StoreWorkUnit ?

func SplitSomeWork added in v0.0.13

func SplitSomeWork(modName string, storeSplit, subreqSplit, modInitBlock, storeLastBlock, incomingReqStartBlock uint64) (out *SplitWork)

type SplitWorkModules added in v0.0.13

type SplitWorkModules map[string]*SplitWork

func (SplitWorkModules) ProgressMessages added in v0.0.13

func (mods SplitWorkModules) ProgressMessages() (out []*pbsubstreams.ModuleProgress)

type Splitter added in v0.0.13

type Splitter struct {
	// contains filtered or unexported fields
}

func NewSplitter added in v0.0.13

func NewSplitter(chunkSize uint64) *Splitter

func (*Splitter) Split added in v0.0.13

func (s *Splitter) Split(moduleInitialBlock uint64, lastSavedBlock uint64, blockRange *block.Range) []*block.Range

type Squashable

type Squashable struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSquashable

func NewSquashable(initialStore *state.Store, targetExclusiveBlock, nextExpectedStartBlock uint64, notifier Notifier) *Squashable

func (*Squashable) IsEmpty

func (s *Squashable) IsEmpty() bool

func (*Squashable) String

func (s *Squashable) String() string

type Squashables

type Squashables []*Squashable

func (Squashables) String

func (s Squashables) String() string

type Squasher

type Squasher struct {
	// contains filtered or unexported fields
}

Squasher produces _complete_ stores, by merging backing partial stores.

func NewSquasher

func NewSquasher(ctx context.Context, splitWorks SplitWorkModules, stores map[string]*state.Store, reqStartBlock uint64, notifier Notifier) (*Squasher, error)

NewSquasher receives stores, initializes them and fetches them from the existing storage. It prepares itself to receive Squash() requests that should correspond to what is missing for those stores to reach `targetExclusiveBlock`. This is managed externally by the Scheduler/Strategy. Eventually, ideally, all components are synchronizes around the actually data: the state of storages present, the requests needed to fill in those stores up to the target block, etc..

func (*Squasher) Squash

func (s *Squasher) Squash(ctx context.Context, moduleName string, reqChunk *reqChunk) error

func (*Squasher) StoresReady

func (s *Squasher) StoresReady() (out map[string]*state.Store, err error)

type StorageState

type StorageState struct {
	// contains filtered or unexported fields
}

func FetchStorageState

func FetchStorageState(ctx context.Context, stores map[string]*state.Store) (out *StorageState, err error)

func NewStorageState

func NewStorageState() *StorageState

func (*StorageState) LastBlock added in v0.0.13

func (s *StorageState) LastBlock(modName string) uint64

type Waiter

type Waiter interface {
	Wait(ctx context.Context) <-chan interface{}
	Signal(storeName string, blockNum uint64)
	Order() int
	String() string
}

type Worker added in v0.0.13

type Worker struct {
	// contains filtered or unexported fields
}

func (*Worker) Run added in v0.0.13

func (w *Worker) Run(ctx context.Context, job *Job, respFunc substreams.ResponseFunc) error

type WorkerPool added in v0.0.13

type WorkerPool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool added in v0.0.13

func NewWorkerPool(workerCount int, originalRequestModules *pbsubstreams.Modules, grpcClientFactory func() (pbsubstreams.StreamClient, []grpc.CallOption, error)) *WorkerPool

func (*WorkerPool) Borrow added in v0.0.13

func (p *WorkerPool) Borrow() *Worker

func (*WorkerPool) ReturnWorker added in v0.0.13

func (p *WorkerPool) ReturnWorker(worker *Worker)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL