Documentation
¶
Index ¶
- func GetRequestStream(ctx context.Context, strategy Strategy) <-chan *pbsubstreams.Request
- func StartQueue(ctx context.Context, in <-chan *QueueItem, out chan<- *QueueItem)
- type BlockWaiter
- type LinearStrategy
- type Notifier
- type OrderedStrategy
- type PriorityQueue
- type QueueItem
- type RequestGetter
- type RequestPool
- func (p *RequestPool) Add(ctx context.Context, request *pbsubstreams.Request, waiter Waiter) error
- func (p *RequestPool) Count() int
- func (p *RequestPool) Get(ctx context.Context) (*pbsubstreams.Request, error)
- func (p *RequestPool) Notify(builder string, blockNum uint64)
- func (p *RequestPool) Start(ctx context.Context)
- type Scheduler
- type Squashable
- type Squashables
- type Squasher
- type SquasherOption
- type Strategy
- type Waiter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetRequestStream ¶
func GetRequestStream(ctx context.Context, strategy Strategy) <-chan *pbsubstreams.Request
Types ¶
type BlockWaiter ¶
type BlockWaiter struct {
// contains filtered or unexported fields
}
func NewWaiter ¶
func NewWaiter(blockNum uint64, lastSavedBlockMap map[string]uint64, stores ...*pbsubstreams.Module) *BlockWaiter
func (*BlockWaiter) Order ¶
func (w *BlockWaiter) Order() int
func (*BlockWaiter) Signal ¶
func (w *BlockWaiter) Signal(storeName string, blockNum uint64)
func (*BlockWaiter) Wait ¶
func (w *BlockWaiter) Wait(ctx context.Context) <-chan interface{}
type LinearStrategy ¶
type LinearStrategy struct {
// contains filtered or unexported fields
}
func NewLinearStrategy ¶
func NewLinearStrategy(ctx context.Context, request *pbsubstreams.Request, builders []*state.Builder, upToBlockNum uint64, blockRangeSizeSubRequests int) (*LinearStrategy, error)
func (*LinearStrategy) GetNextRequest ¶
func (s *LinearStrategy) GetNextRequest(ctx context.Context) (*pbsubstreams.Request, error)
func (*LinearStrategy) RequestCount ¶
func (s *LinearStrategy) RequestCount() int
type OrderedStrategy ¶
type OrderedStrategy struct {
// contains filtered or unexported fields
}
func NewOrderedStrategy ¶
func NewOrderedStrategy( ctx context.Context, request *pbsubstreams.Request, builders []*state.Builder, graph *manifest.ModuleGraph, pool *RequestPool, upToBlockNum uint64, blockRangeSizeSubRequests int, maxRangeSize uint64, ) (*OrderedStrategy, error)
func (*OrderedStrategy) GetNextRequest ¶
func (d *OrderedStrategy) GetNextRequest(ctx context.Context) (*pbsubstreams.Request, error)
func (*OrderedStrategy) RequestCount ¶
func (d *OrderedStrategy) RequestCount() int
type PriorityQueue ¶
type PriorityQueue []*QueueItem
A PriorityQueue implements heap.Interface and holds Items.
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type QueueItem ¶
type QueueItem struct {
Request *pbsubstreams.Request // The value of the item; arbitrary.
Priority int // The Priority of the item in the queue.
// contains filtered or unexported fields
}
type RequestGetter ¶
type RequestPool ¶
type RequestPool struct {
// contains filtered or unexported fields
}
func NewRequestPool ¶
func NewRequestPool() *RequestPool
func (*RequestPool) Add ¶
func (p *RequestPool) Add(ctx context.Context, request *pbsubstreams.Request, waiter Waiter) error
func (*RequestPool) Count ¶
func (p *RequestPool) Count() int
func (*RequestPool) Get ¶
func (p *RequestPool) Get(ctx context.Context) (*pbsubstreams.Request, error)
func (*RequestPool) Notify ¶
func (p *RequestPool) Notify(builder string, blockNum uint64)
func (*RequestPool) Start ¶
func (p *RequestPool) Start(ctx context.Context)
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
type Squashable ¶
type Squashable struct {
// contains filtered or unexported fields
}
func NewSquashable ¶
func NewSquashable(initialBuilder *state.Builder) *Squashable
func (*Squashable) IsEmpty ¶
func (s *Squashable) IsEmpty() bool
func (*Squashable) String ¶
func (s *Squashable) String() string
type Squashables ¶
type Squashables []*Squashable
func (Squashables) String ¶
func (s Squashables) String() string
type Squasher ¶
type Squasher struct {
// contains filtered or unexported fields
}
func NewSquasher ¶
type SquasherOption ¶
type SquasherOption func(s *Squasher)
func WithNotifier ¶
func WithNotifier(notifier Notifier) SquasherOption
Source Files
¶
Click to show internal directories.
Click to hide internal directories.