Documentation
¶
Index ¶
- func StartQueue(ctx context.Context, in <-chan *QueueItem, out chan<- *QueueItem)
- type BlockWaiter
- type Job
- type Notifier
- type OrderedStrategy
- type PriorityQueue
- type QueueItem
- type RequestPool
- type Scheduler
- type SplitWork
- type SplitWorkModules
- type Splitter
- type Squashable
- type Squashables
- type Squasher
- type StorageState
- type Waiter
- type Worker
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BlockWaiter ¶
type BlockWaiter struct {
// contains filtered or unexported fields
}
func NewWaiter ¶
func NewWaiter(blockNum uint64, stores ...*pbsubstreams.Module) *BlockWaiter
func (*BlockWaiter) Order ¶
func (w *BlockWaiter) Order() int
func (*BlockWaiter) Signal ¶
func (w *BlockWaiter) Signal(storeName string, blockNum uint64)
func (*BlockWaiter) String ¶
func (w *BlockWaiter) String() string
func (*BlockWaiter) Wait ¶
func (w *BlockWaiter) Wait(ctx context.Context) <-chan interface{}
type Job ¶ added in v0.0.13
type Job struct {
// contains filtered or unexported fields
}
func (*Job) MarshalLogObject ¶ added in v0.0.13
func (j *Job) MarshalLogObject(enc zapcore.ObjectEncoder) error
type OrderedStrategy ¶
type OrderedStrategy struct {
// contains filtered or unexported fields
}
func NewOrderedStrategy ¶
func NewOrderedStrategy( ctx context.Context, splitWorks SplitWorkModules, request *pbsubstreams.Request, stores map[string]*state.Store, graph *manifest.ModuleGraph, pool *RequestPool, ) (*OrderedStrategy, error)
type PriorityQueue ¶
type PriorityQueue []*QueueItem
A PriorityQueue implements heap.Interface and holds Items.
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type QueueItem ¶
type QueueItem struct {
Priority int // The Priority of the item in the queue.
// contains filtered or unexported fields
}
type RequestPool ¶
type RequestPool struct {
// contains filtered or unexported fields
}
func NewRequestPool ¶
func NewRequestPool() *RequestPool
func (*RequestPool) Count ¶
func (p *RequestPool) Count() int
func (*RequestPool) Notify ¶
func (p *RequestPool) Notify(builder string, blockNum uint64)
func (*RequestPool) Start ¶
func (p *RequestPool) Start(ctx context.Context)
func (*RequestPool) State ¶
func (p *RequestPool) State() string
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(ctx context.Context, strategy *OrderedStrategy, squasher *Squasher, workerPool *WorkerPool, respFunc substreams.ResponseFunc, blockRangeSizeSubRequests int) (*Scheduler, error)
type SplitWork ¶ added in v0.0.13
type SplitWork struct {
// contains filtered or unexported fields
}
FIXME(abourget): StoreWorkUnit ?
func SplitSomeWork ¶ added in v0.0.13
type SplitWorkModules ¶ added in v0.0.13
func (SplitWorkModules) ProgressMessages ¶ added in v0.0.13
func (mods SplitWorkModules) ProgressMessages() (out []*pbsubstreams.ModuleProgress)
type Splitter ¶ added in v0.0.13
type Splitter struct {
// contains filtered or unexported fields
}
func NewSplitter ¶ added in v0.0.13
type Squashable ¶
func NewSquashable ¶
func NewSquashable(initialStore *state.Store, targetExclusiveBlock, nextExpectedStartBlock uint64, notifier Notifier) *Squashable
func (*Squashable) IsEmpty ¶
func (s *Squashable) IsEmpty() bool
func (*Squashable) String ¶
func (s *Squashable) String() string
type Squashables ¶
type Squashables []*Squashable
func (Squashables) String ¶
func (s Squashables) String() string
type Squasher ¶
type Squasher struct {
// contains filtered or unexported fields
}
Squasher produces _complete_ stores, by merging backing partial stores.
func NewSquasher ¶
func NewSquasher(ctx context.Context, splitWorks SplitWorkModules, stores map[string]*state.Store, reqStartBlock uint64, notifier Notifier) (*Squasher, error)
NewSquasher receives stores, initializes them and fetches them from the existing storage. It prepares itself to receive Squash() requests that should correspond to what is missing for those stores to reach `targetExclusiveBlock`. This is managed externally by the Scheduler/Strategy. Eventually, ideally, all components are synchronizes around the actually data: the state of storages present, the requests needed to fill in those stores up to the target block, etc..
type StorageState ¶
type StorageState struct {
// contains filtered or unexported fields
}
func FetchStorageState ¶
func NewStorageState ¶
func NewStorageState() *StorageState
func (*StorageState) LastBlock ¶ added in v0.0.13
func (s *StorageState) LastBlock(modName string) uint64
type Worker ¶ added in v0.0.13
type Worker struct {
// contains filtered or unexported fields
}
func (*Worker) Run ¶ added in v0.0.13
func (w *Worker) Run(ctx context.Context, job *Job, respFunc substreams.ResponseFunc) error
type WorkerPool ¶ added in v0.0.13
type WorkerPool struct {
// contains filtered or unexported fields
}
func NewWorkerPool ¶ added in v0.0.13
func NewWorkerPool(workerCount int, originalRequestModules *pbsubstreams.Modules, grpcClientFactory func() (pbsubstreams.StreamClient, []grpc.CallOption, error)) *WorkerPool
func (*WorkerPool) Borrow ¶ added in v0.0.13
func (p *WorkerPool) Borrow() *Worker
func (*WorkerPool) ReturnWorker ¶ added in v0.0.13
func (p *WorkerPool) ReturnWorker(worker *Worker)
Click to show internal directories.
Click to hide internal directories.