Documentation
¶
Index ¶
- func ComputeStoreExclusiveEndBlock(lastSavedBlock, reqStartBlock, saveInterval, moduleInitialBlock uint64) uint64
- func GetRequestStream(ctx context.Context, strategy Strategy) <-chan *pbsubstreams.Request
- func StartQueue(ctx context.Context, in <-chan *QueueItem, out chan<- *QueueItem)
- type BlockWaiter
- type Notifier
- type OrderedStrategy
- type PriorityQueue
- type QueueItem
- type RequestGetter
- type RequestPool
- func (p *RequestPool) Add(ctx context.Context, reverseIdx int, request *pbsubstreams.Request, ...) error
- func (p *RequestPool) Count() int
- func (p *RequestPool) Get(ctx context.Context) (*pbsubstreams.Request, error)
- func (p *RequestPool) Notify(builder string, blockNum uint64)
- func (p *RequestPool) Start(ctx context.Context)
- func (p *RequestPool) State() string
- type Scheduler
- type Squashable
- type Squashables
- type Squasher
- type SquasherOption
- type StorageState
- type Strategy
- type Waiter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetRequestStream ¶
func GetRequestStream(ctx context.Context, strategy Strategy) <-chan *pbsubstreams.Request
Types ¶
type BlockWaiter ¶
type BlockWaiter struct {
// contains filtered or unexported fields
}
func NewWaiter ¶
func NewWaiter(blockNum uint64, storageState *StorageState, stores ...*pbsubstreams.Module) *BlockWaiter
func (*BlockWaiter) Order ¶
func (w *BlockWaiter) Order() int
func (*BlockWaiter) Signal ¶
func (w *BlockWaiter) Signal(storeName string, blockNum uint64)
func (*BlockWaiter) String ¶
func (w *BlockWaiter) String() string
func (*BlockWaiter) Wait ¶
func (w *BlockWaiter) Wait(ctx context.Context) <-chan interface{}
type OrderedStrategy ¶
type OrderedStrategy struct {
// contains filtered or unexported fields
}
func NewOrderedStrategy ¶
func NewOrderedStrategy( ctx context.Context, storageState *StorageState, request *pbsubstreams.Request, stores map[string]*state.Store, graph *manifest.ModuleGraph, pool *RequestPool, upToBlockNum uint64, storeSaveInterval uint64, blockRangeSizeSubRequests int, maxRangeSize uint64, ) (*OrderedStrategy, error)
func (*OrderedStrategy) GetNextRequest ¶
func (d *OrderedStrategy) GetNextRequest(ctx context.Context) (*pbsubstreams.Request, error)
type PriorityQueue ¶
type PriorityQueue []*QueueItem
A PriorityQueue implements heap.Interface and holds Items.
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type QueueItem ¶
type QueueItem struct {
Request *pbsubstreams.Request // The value of the item; arbitrary.
Priority int // The Priority of the item in the queue.
// contains filtered or unexported fields
}
type RequestGetter ¶
type RequestGetter interface {
Get(ctx context.Context) (*pbsubstreams.Request, error)
}
type RequestPool ¶
type RequestPool struct {
// contains filtered or unexported fields
}
func NewRequestPool ¶
func NewRequestPool() *RequestPool
func (*RequestPool) Add ¶
func (p *RequestPool) Add(ctx context.Context, reverseIdx int, request *pbsubstreams.Request, waiter Waiter) error
func (*RequestPool) Count ¶
func (p *RequestPool) Count() int
func (*RequestPool) Get ¶
func (p *RequestPool) Get(ctx context.Context) (*pbsubstreams.Request, error)
func (*RequestPool) Notify ¶
func (p *RequestPool) Notify(builder string, blockNum uint64)
func (*RequestPool) Start ¶
func (p *RequestPool) Start(ctx context.Context)
func (*RequestPool) State ¶
func (p *RequestPool) State() string
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
type Squashable ¶
func NewSquashable ¶
func NewSquashable(initialStore *state.Store, targetExclusiveBlock, storeSaveInterval, nextExpectedStartBlock uint64) *Squashable
func (*Squashable) IsEmpty ¶
func (s *Squashable) IsEmpty() bool
func (*Squashable) String ¶
func (s *Squashable) String() string
type Squashables ¶
type Squashables []*Squashable
func (Squashables) String ¶
func (s Squashables) String() string
type Squasher ¶
type Squasher struct {
// contains filtered or unexported fields
}
Squasher produces _complete_ stores, by merging backing partial stores.
func NewSquasher ¶
func NewSquasher(ctx context.Context, storageState *StorageState, stores map[string]*state.Store, storeSaveInterval uint64, reqStartBlock uint64, opts ...SquasherOption) (*Squasher, error)
NewSquasher receives stores, initializes them and fetches them from the existing storage. It prepares itself to receive Squash() requests that should correspond to what is missing for those stores to reach `targetExclusiveBlock`. This is managed externally by the Scheduler/Strategy. Eventually, ideally, all components are synchronizes around the actually data: the state of storages present, the requests needed to fill in those stores up to the target block, etc..
type SquasherOption ¶
type SquasherOption func(s *Squasher)
func WithNotifier ¶
func WithNotifier(notifier Notifier) SquasherOption
type StorageState ¶
type StorageState struct {
// contains filtered or unexported fields
}
func FetchStorageState ¶
func NewStorageState ¶
func NewStorageState() *StorageState
func (*StorageState) ProgressMessages ¶
func (s *StorageState) ProgressMessages() (out []*pbsubstreams.ModuleProgress)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.