Documentation
¶
Index ¶
- Constants
- type AddressBatch
- type AddressProvider
- type BlockScannerOption
- func WithBatchSize(batchSize int) BlockScannerOption
- func WithCandidateScanners(candidateScanners []candidates.CandidateScanner) BlockScannerOption
- func WithChainID(chainID flow.ChainID) BlockScannerOption
- func WithContext(ctx context.Context) BlockScannerOption
- func WithContinuousScan(continuous bool) BlockScannerOption
- func WithLogger(logger zerolog.Logger) BlockScannerOption
- func WithScript(script []byte) BlockScannerOption
- func WithScriptResultHandler(handler ScriptResultHandler) BlockScannerOption
- func WithStatusReporter(reporter StatusReporter) BlockScannerOption
- type Component
- type ComponentBase
- type FullScan
- type FullScanRunner
- type IncrementalScanner
- type NoOpScriptResultHandler
- type NoOpStatusReporter
- type ProcessedAddressBatch
- type ScanConcluded
- type Scanner
- type ScriptResultHandler
- type ScriptResultProcessor
- type ScriptRunner
- type StatusReporter
- type StatusReporterOption
Constants ¶
const DefaultBatchSize = 1000
const DefaultStatusReporterPort = 2112
const FullScanReferenceBlockSwitch = 30 * time.Second
const IncrementalScannerBlockLag = 5
IncrementalScannerBlockLag is the number of blocks the incremental scanner lag behind the latest block from GetLatestBlockHeader. This is to avoid most of the "retry for collection in finalized block" errors.
const IncrementalScannerMaxBlockGap = 100
IncrementalScannerMaxBlockGap is the maximum number of blocks that can scanned by the incremental scanner. If the gap is larger than this, the incremental scanner will request a full scan.
const ScriptRunnerMaxConcurrentScripts = 100
ScriptRunnerMaxConcurrentScripts is the maximum number of scripts that can be running concurrently at any given time. If this is more than the rate limit, some scripts will be just waiting. As long as they don't wait too long, this is not a problem.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AddressBatch ¶
type AddressBatch struct {
Addresses []flow.Address
BlockHeight uint64
// contains filtered or unexported fields
}
AddressBatch is a batch of addresses that will be the input to the script being run byt the script runner at the given block height.
func NewAddressBatch ¶
func NewAddressBatch( addresses []flow.Address, blockHeight uint64, doneHandling func(), isCancelled func() bool, ) AddressBatch
func (*AddressBatch) DoneHandling ¶
func (b *AddressBatch) DoneHandling()
DoneHandling should be called when the batch has been processed.
func (*AddressBatch) ExcludeAddress ¶
func (b *AddressBatch) ExcludeAddress(address flow.Address)
func (*AddressBatch) IsValid ¶
func (b *AddressBatch) IsValid() bool
IsValid if the batch is cancelled, it should not be processed.
type AddressProvider ¶
type AddressProvider struct {
// contains filtered or unexported fields
}
AddressProvider Is used to get all the addresses that exists at a certain referenceBlockId this relies on the fact that a certain `endOfAccountsError` will be returned by the `accountStorageUsageScript` if the address doesn't exist yet
func InitAddressProvider ¶
func InitAddressProvider(ctx context.Context, log zerolog.Logger, chain flow.ChainID, blockHeight uint64, client client.Client, ) (*AddressProvider, error)
InitAddressProvider uses bisection to get the last existing address.
func (*AddressProvider) AddressesLen ¶
func (p *AddressProvider) AddressesLen() uint
func (*AddressProvider) GenerateAddressBatches ¶
func (p *AddressProvider) GenerateAddressBatches(addressChan chan<- []flow.Address, batchSize int)
func (*AddressProvider) GetNextAddress ¶
func (p *AddressProvider) GetNextAddress() (address flow.Address, isOutOfBounds bool)
func (*AddressProvider) LastAddress ¶
func (p *AddressProvider) LastAddress() flow.Address
type BlockScannerOption ¶
type BlockScannerOption = func(*Scanner)
func WithBatchSize ¶
func WithBatchSize(batchSize int) BlockScannerOption
func WithCandidateScanners ¶
func WithCandidateScanners(candidateScanners []candidates.CandidateScanner) BlockScannerOption
func WithChainID ¶
func WithChainID(chainID flow.ChainID) BlockScannerOption
func WithContext ¶
func WithContext(ctx context.Context) BlockScannerOption
func WithContinuousScan ¶
func WithContinuousScan(continuous bool) BlockScannerOption
func WithLogger ¶
func WithLogger(logger zerolog.Logger) BlockScannerOption
func WithScript ¶
func WithScript(script []byte) BlockScannerOption
func WithScriptResultHandler ¶
func WithScriptResultHandler(handler ScriptResultHandler) BlockScannerOption
func WithStatusReporter ¶
func WithStatusReporter(reporter StatusReporter) BlockScannerOption
type Component ¶
type Component interface {
Err() error
Done() <-chan struct{}
Started() <-chan struct{}
}
type ComponentBase ¶
func NewComponent ¶
func NewComponent(name string, logger zerolog.Logger) *ComponentBase
func (*ComponentBase) Done ¶
func (c *ComponentBase) Done() <-chan struct{}
func (*ComponentBase) Err ¶
func (c *ComponentBase) Err() error
func (*ComponentBase) Finish ¶
func (c *ComponentBase) Finish(err error)
func (*ComponentBase) Started ¶
func (c *ComponentBase) Started() <-chan struct{}
func (*ComponentBase) StartupDone ¶
func (c *ComponentBase) StartupDone()
type FullScan ¶
type FullScan struct {
*ComponentBase
// contains filtered or unexported fields
}
type FullScanRunner ¶
type FullScanRunner struct {
// contains filtered or unexported fields
}
func NewFullScanRunner ¶
func NewFullScanRunner( client client.Client, addressBatchChan chan<- AddressBatch, batchSize int, chainID flow.ChainID, reporter StatusReporter, logger zerolog.Logger, ) *FullScanRunner
func (*FullScanRunner) StartBatch ¶
func (r *FullScanRunner) StartBatch( ctx context.Context, blockHeight uint64, ) *FullScan
type IncrementalScanner ¶
type IncrementalScanner struct {
*ComponentBase
// contains filtered or unexported fields
}
func NewIncrementalScanner ¶
func NewIncrementalScanner( ctx context.Context, client client.Client, addressBatchChan chan<- AddressBatch, requestBatchChan chan<- uint64, startAtBlock uint64, batchSize int, blockCandidateScanners []candidates.CandidateScanner, reporter StatusReporter, logger zerolog.Logger, ) *IncrementalScanner
func (*IncrementalScanner) LatestHandledBlock ¶
func (r *IncrementalScanner) LatestHandledBlock() uint64
type NoOpScriptResultHandler ¶
type NoOpScriptResultHandler struct{}
func (NoOpScriptResultHandler) Handle ¶
func (d NoOpScriptResultHandler) Handle(_ ProcessedAddressBatch) error
type NoOpStatusReporter ¶
type NoOpStatusReporter struct{}
func (NoOpStatusReporter) ReportFullScanProgress ¶
func (n NoOpStatusReporter) ReportFullScanProgress(uint64, uint64)
func (NoOpStatusReporter) ReportIncrementalBlockDiff ¶
func (n NoOpStatusReporter) ReportIncrementalBlockDiff(uint64)
func (NoOpStatusReporter) ReportIncrementalBlockHeight ¶
func (n NoOpStatusReporter) ReportIncrementalBlockHeight(uint64)
func (NoOpStatusReporter) ReportIsFullScanRunning ¶
func (n NoOpStatusReporter) ReportIsFullScanRunning(bool)
type ProcessedAddressBatch ¶
type ProcessedAddressBatch struct {
AddressBatch
Result cadence.Value
}
ProcessedAddressBatch contains the result of running the script on the given batch of addresses.
type ScanConcluded ¶
type Scanner ¶ added in v0.2.0
type Scanner struct {
// contains filtered or unexported fields
}
func NewScanner ¶ added in v0.2.0
func NewScanner( client client.Client, options ...BlockScannerOption, ) *Scanner
func (*Scanner) Scan ¶ added in v0.2.0
func (scanner *Scanner) Scan() (ScanConcluded, error)
type ScriptResultHandler ¶
type ScriptResultHandler interface {
// Handle will be called concurrently for each ProcessedAddressBatch.
Handle(batch ProcessedAddressBatch) error
}
type ScriptResultProcessor ¶
type ScriptResultProcessor struct {
*ComponentBase
// contains filtered or unexported fields
}
func NewScriptResultProcessor ¶
func NewScriptResultProcessor( ctx context.Context, outChan <-chan ProcessedAddressBatch, handler ScriptResultHandler, logger zerolog.Logger, ) *ScriptResultProcessor
type ScriptRunner ¶
type ScriptRunner struct {
*ComponentBase
// contains filtered or unexported fields
}
func NewScriptRunner ¶
func NewScriptRunner( ctx context.Context, client client.Client, scriptCode []byte, addressBatchChan <-chan AddressBatch, resultsChan chan<- ProcessedAddressBatch, logger zerolog.Logger, ) *ScriptRunner
type StatusReporter ¶
type StatusReporter interface {
ReportIncrementalBlockDiff(diff uint64)
ReportIncrementalBlockHeight(height uint64)
ReportIsFullScanRunning(running bool)
ReportFullScanProgress(current uint64, total uint64)
}
func NewStatusReporter ¶
func NewStatusReporter( ctx context.Context, prefix string, logger zerolog.Logger, options ...StatusReporterOption, ) StatusReporter
NewStatusReporter creates a new status reporter that reports the status of the indexer to prometheus. It will start a http server on the given port that exposes the metrics (unless this is disabled for the case where you would want to serve metrics yourself). the prefix is used to prefix all metrics. The status reporter will report: - the incremental block diff (the difference between the last block height handled by the incremental scanner and the current block height) - the incremental block height (the block height last handled by the incremental scanner) - if a full scan is currently running (if it is any data the scanner is tracking is inaccurate) - if a full scan is currently running, the progress of the full scan (from 0 to 1)
type StatusReporterOption ¶ added in v0.2.0
type StatusReporterOption = func(*statusReporter)
func WithStartServer ¶ added in v0.2.0
func WithStartServer(shouldStartServer bool) StatusReporterOption
func WithStatusReporterPort ¶ added in v0.2.0
func WithStatusReporterPort(port int) StatusReporterOption