orchestrator

package
v0.0.9-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComputeStoreExclusiveEndBlock

func ComputeStoreExclusiveEndBlock(lastSavedBlock, reqStartBlock, saveInterval, moduleInitialBlock uint64) uint64

func GetRequestStream

func GetRequestStream(ctx context.Context, strategy Strategy) <-chan *pbsubstreams.Request

func StartQueue

func StartQueue(ctx context.Context, in <-chan *QueueItem, out chan<- *QueueItem)

Types

type BlockWaiter

type BlockWaiter struct {
	// contains filtered or unexported fields
}

func NewWaiter

func NewWaiter(blockNum uint64, storageState *StorageState, stores ...*pbsubstreams.Module) *BlockWaiter

func (*BlockWaiter) Order

func (w *BlockWaiter) Order() int

func (*BlockWaiter) Signal

func (w *BlockWaiter) Signal(storeName string, blockNum uint64)

func (*BlockWaiter) String

func (w *BlockWaiter) String() string

func (*BlockWaiter) Wait

func (w *BlockWaiter) Wait(ctx context.Context) <-chan interface{}

type Notifier

type Notifier interface {
	Notify(builder string, blockNum uint64)
}

type OrderedStrategy

type OrderedStrategy struct {
	// contains filtered or unexported fields
}

func NewOrderedStrategy

func NewOrderedStrategy(
	ctx context.Context,
	storageState *StorageState,
	request *pbsubstreams.Request,
	stores map[string]*state.Store,
	graph *manifest.ModuleGraph,
	pool *RequestPool,
	upToBlockNum uint64,
	blockRangeSizeSubRequests int,
	maxRangeSize uint64,
) (*OrderedStrategy, error)

func (*OrderedStrategy) GetNextRequest

func (d *OrderedStrategy) GetNextRequest(ctx context.Context) (*pbsubstreams.Request, error)

func (*OrderedStrategy) RequestCount

func (d *OrderedStrategy) RequestCount() int

type PriorityQueue

type PriorityQueue []*QueueItem

A PriorityQueue implements heap.Interface and holds Items.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type QueueItem

type QueueItem struct {
	Request  *pbsubstreams.Request // The value of the item; arbitrary.
	Priority int                   // The Priority of the item in the queue.
	// contains filtered or unexported fields
}

type RequestGetter

type RequestGetter interface {
	Get(ctx context.Context) (*pbsubstreams.Request, error)
	Count() int
}

type RequestPool

type RequestPool struct {
	// contains filtered or unexported fields
}

func NewRequestPool

func NewRequestPool() *RequestPool

func (*RequestPool) Add

func (p *RequestPool) Add(ctx context.Context, request *pbsubstreams.Request, waiter Waiter) error

func (*RequestPool) Count

func (p *RequestPool) Count() int

func (*RequestPool) Get

func (*RequestPool) Notify

func (p *RequestPool) Notify(builder string, blockNum uint64)

func (*RequestPool) Start

func (p *RequestPool) Start(ctx context.Context)

func (*RequestPool) State

func (p *RequestPool) State() string

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(ctx context.Context, strategy Strategy, squasher *Squasher, workerPool *worker.Pool, respFunc substreams.ResponseFunc, blockRangeSizeSubRequests int) (*Scheduler, error)

func (*Scheduler) Callback

func (s *Scheduler) Callback(ctx context.Context, outgoingReq *pbsubstreams.Request) error

func (*Scheduler) Launch

func (s *Scheduler) Launch(ctx context.Context, result chan error) (out chan error)

func (*Scheduler) Next

func (s *Scheduler) Next() (*pbsubstreams.Request, error)

type Squashable

type Squashable struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSquashable

func NewSquashable(initialStore *state.Store, targetExclusiveBlock, storeSaveInterval, nextExpectedStartBlock uint64) *Squashable

func (*Squashable) IsEmpty

func (s *Squashable) IsEmpty() bool

func (*Squashable) String

func (s *Squashable) String() string

type Squashables

type Squashables []*Squashable

func (Squashables) String

func (s Squashables) String() string

type Squasher

type Squasher struct {
	// contains filtered or unexported fields
}

Squasher produces _complete_ stores, by merging backing partial stores.

func NewSquasher

func NewSquasher(ctx context.Context, storageState *StorageState, stores map[string]*state.Store, storeSaveInterval uint64, reqStartBlock uint64, opts ...SquasherOption) (*Squasher, error)

NewSquasher receives stores, initializes them and fetches them from the existing storage. It prepares itself to receive Squash() requests that should correspond to what is missing for those stores to reach `targetExclusiveBlock`. This is managed externally by the Scheduler/Strategy. Eventually, ideally, all components are synchronizes around the actually data: the state of storages present, the requests needed to fill in those stores up to the target block, etc..

func (*Squasher) Squash

func (s *Squasher) Squash(ctx context.Context, moduleName string, outgoingReqRange *block.Range) error

func (*Squasher) StoresReady

func (s *Squasher) StoresReady() (out map[string]*state.Store, err error)

type SquasherOption

type SquasherOption func(s *Squasher)

func WithNotifier

func WithNotifier(notifier Notifier) SquasherOption

type StorageState

type StorageState struct {
	// contains filtered or unexported fields
}

func FetchStorageState

func FetchStorageState(ctx context.Context, stores map[string]*state.Store) (out *StorageState, err error)

func NewStorageState

func NewStorageState() *StorageState

func (*StorageState) ProgressMessages

func (s *StorageState) ProgressMessages() (out []*pbsubstreams.ModuleProgress)

type Strategy

type Strategy interface {
	GetNextRequest(ctx context.Context) (*pbsubstreams.Request, error)
	RequestCount() int
}

type Waiter

type Waiter interface {
	Wait(ctx context.Context) <-chan interface{}
	Signal(storeName string, blockNum uint64)
	Order() int
	String() string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL