orchestrator

package
v0.0.7-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetRequestStream

func GetRequestStream(ctx context.Context, strategy Strategy) <-chan *pbsubstreams.Request

func StartQueue

func StartQueue(ctx context.Context, in <-chan *QueueItem, out chan<- *QueueItem)

Types

type BlockWaiter

type BlockWaiter struct {
	// contains filtered or unexported fields
}

func NewWaiter

func NewWaiter(blockNum uint64, lastSavedBlockMap map[string]uint64, stores ...*pbsubstreams.Module) *BlockWaiter

func (*BlockWaiter) Order

func (w *BlockWaiter) Order() int

func (*BlockWaiter) Signal

func (w *BlockWaiter) Signal(storeName string, blockNum uint64)

func (*BlockWaiter) Wait

func (w *BlockWaiter) Wait(ctx context.Context) <-chan interface{}

type LinearStrategy

type LinearStrategy struct {
	// contains filtered or unexported fields
}

func NewLinearStrategy

func NewLinearStrategy(ctx context.Context, request *pbsubstreams.Request, builders []*state.Builder, upToBlockNum uint64, blockRangeSizeSubRequests int) (*LinearStrategy, error)

func (*LinearStrategy) GetNextRequest

func (s *LinearStrategy) GetNextRequest(ctx context.Context) (*pbsubstreams.Request, error)

func (*LinearStrategy) RequestCount

func (s *LinearStrategy) RequestCount() int

type Notifier

type Notifier interface {
	Notify(builder string, blockNum uint64)
}

type OrderedStrategy

type OrderedStrategy struct {
	// contains filtered or unexported fields
}

func NewOrderedStrategy

func NewOrderedStrategy(
	ctx context.Context,
	request *pbsubstreams.Request,
	builders []*state.Builder,
	graph *manifest.ModuleGraph,
	pool *RequestPool,
	upToBlockNum uint64,
	blockRangeSizeSubRequests int,
	maxRangeSize uint64,
) (*OrderedStrategy, error)

func (*OrderedStrategy) GetNextRequest

func (d *OrderedStrategy) GetNextRequest(ctx context.Context) (*pbsubstreams.Request, error)

func (*OrderedStrategy) RequestCount

func (d *OrderedStrategy) RequestCount() int

type PriorityQueue

type PriorityQueue []*QueueItem

A PriorityQueue implements heap.Interface and holds Items.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type QueueItem

type QueueItem struct {
	Request  *pbsubstreams.Request // The value of the item; arbitrary.
	Priority int                   // The Priority of the item in the queue.
	// contains filtered or unexported fields
}

type RequestGetter

type RequestGetter interface {
	Get(ctx context.Context) (*pbsubstreams.Request, error)
	Count() int
}

type RequestPool

type RequestPool struct {
	// contains filtered or unexported fields
}

func NewRequestPool

func NewRequestPool() *RequestPool

func (*RequestPool) Add

func (p *RequestPool) Add(ctx context.Context, request *pbsubstreams.Request, waiter Waiter) error

func (*RequestPool) Count

func (p *RequestPool) Count() int

func (*RequestPool) Get

func (*RequestPool) Notify

func (p *RequestPool) Notify(builder string, blockNum uint64)

func (*RequestPool) Start

func (p *RequestPool) Start(ctx context.Context)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(ctx context.Context, strategy Strategy, squasher *Squasher, blockRangeSizeSubRequests int) (*Scheduler, error)

func (*Scheduler) Callback

func (s *Scheduler) Callback(ctx context.Context, r *pbsubstreams.Request) error

func (*Scheduler) Next

func (s *Scheduler) Next() (*pbsubstreams.Request, error)

type Squashable

type Squashable struct {
	// contains filtered or unexported fields
}

func NewSquashable

func NewSquashable(initialBuilder *state.Builder) *Squashable

func (*Squashable) IsEmpty

func (s *Squashable) IsEmpty() bool

func (*Squashable) String

func (s *Squashable) String() string

type Squashables

type Squashables []*Squashable

func (Squashables) String

func (s Squashables) String() string

type Squasher

type Squasher struct {
	// contains filtered or unexported fields
}

func NewSquasher

func NewSquasher(ctx context.Context, builders []*state.Builder, outputCaches map[string]*outputs.OutputCache, storeSaveInterval uint64, opts ...SquasherOption) (*Squasher, error)

func (*Squasher) Close

func (s *Squasher) Close() error

func (*Squasher) Squash

func (s *Squasher) Squash(ctx context.Context, moduleName string, requestBlockRange *block.Range) error

type SquasherOption

type SquasherOption func(s *Squasher)

func WithNotifier

func WithNotifier(notifier Notifier) SquasherOption

type Strategy

type Strategy interface {
	GetNextRequest(ctx context.Context) (*pbsubstreams.Request, error)
	RequestCount() int
}

type Waiter

type Waiter interface {
	Wait(ctx context.Context) <-chan interface{}
	Signal(storeName string, blockNum uint64)
	Order() int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL