scanner

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2023 License: Apache-2.0 Imports: 19 Imported by: 0

README

Flow Batch Scan

A library to make it easy to scan the entire flow chain.

How does it work

The BlockScanner.Scan method will perform a full scan of all the addresses on chain starting at the latest available block using the provided cadence script. This will take some time, during which the full scan has to switch newer reference blocks, because the old ones are no longer available. To ensure that the final data is accurate when the scan ends, an incremental scan also runs besides the full scan. The incremental scan looks at new blocks for any accounts could have had their data changed during that block (the data that the script is looking for). If there are candidates to be scanned, the incremental scanner will scan them with the same script and update the results of the full scan.

In continuous mode the incremental scan will keep running and will scan any candidates that might have changed.

The library expects 3 components:

  • a cadence script that has to accept an address array as input addresses: [Address] and returns any cadence value as the result.
  • an array of candidate scanners which scan a block range looking for accounts that could have had changed, so that the script would now return a different result.
  • a result handler that will be called with the results of the script for each address array.

Use case

Any quantity can be scanned for if:

  • it can be observed by a cadence script
  • the change of the quantity can be observed by looking at transaction results of a block (e.g: events)

Example:

  1. Scanning for contracts deployed on accounts. (see examples/contracts)
  2. Scanning for accounts FT or NFT balance.
  3. Scanning for public keys added to accounts.

Examples

See the examples folder, there are a lot of comments in there. The contract_names example is a one time scan example. the monitoer_contract_deployments example is a continuous scan example and builds on the contract_names example.

Documentation

Index

Constants

View Source
const DefaultBatchSize = 1000
View Source
const DefaultStatusReporterPort = 2112
View Source
const FullScanReferenceBlockSwitch = 30 * time.Second
View Source
const IncrementalScannerBlockLag = 5

IncrementalScannerBlockLag is the number of blocks the incremental scanner lag behind the latest block from GetLatestBlockHeader. This is to avoid most of the "retry for collection in finalized block" errors.

View Source
const IncrementalScannerMaxBlockGap = 100

IncrementalScannerMaxBlockGap is the maximum number of blocks that can scanned by the incremental scanner. If the gap is larger than this, the incremental scanner will request a full scan.

View Source
const ScriptRunnerMaxConcurrentScripts = 100

ScriptRunnerMaxConcurrentScripts is the maximum number of scripts that can be running concurrently at any given time. If this is more than the rate limit, some scripts will be just waiting. As long as they don't wait too long, this is not a problem.

Variables

This section is empty.

Functions

This section is empty.

Types

type AddressBatch

type AddressBatch struct {
	Addresses   []flow.Address
	BlockHeight uint64
	// contains filtered or unexported fields
}

AddressBatch is a batch of addresses that will be the input to the script being run byt the script runner at the given block height.

func NewAddressBatch

func NewAddressBatch(
	addresses []flow.Address,
	blockHeight uint64,
	doneHandling func(),
	isCancelled func() bool,
) AddressBatch

func (*AddressBatch) DoneHandling

func (b *AddressBatch) DoneHandling()

DoneHandling should be called when the batch has been processed.

func (*AddressBatch) ExcludeAddress

func (b *AddressBatch) ExcludeAddress(address flow.Address)

func (*AddressBatch) IsValid

func (b *AddressBatch) IsValid() bool

IsValid if the batch is cancelled, it should not be processed.

type AddressProvider

type AddressProvider struct {
	// contains filtered or unexported fields
}

AddressProvider Is used to get all the addresses that exists at a certain referenceBlockId this relies on the fact that a certain `endOfAccountsError` will be returned by the `accountStorageUsageScript` if the address doesn't exist yet

func InitAddressProvider

func InitAddressProvider(ctx context.Context,
	log zerolog.Logger,
	chain flow.ChainID,
	blockHeight uint64,
	client client.Client,
) (*AddressProvider, error)

InitAddressProvider uses bisection to get the last existing address.

func (*AddressProvider) AddressesLen

func (p *AddressProvider) AddressesLen() uint

func (*AddressProvider) GenerateAddressBatches

func (p *AddressProvider) GenerateAddressBatches(addressChan chan<- []flow.Address, batchSize int)

func (*AddressProvider) GetNextAddress

func (p *AddressProvider) GetNextAddress() (address flow.Address, isOutOfBounds bool)

func (*AddressProvider) LastAddress

func (p *AddressProvider) LastAddress() flow.Address

type BlockScannerOption

type BlockScannerOption = func(*Scanner)

func WithBatchSize

func WithBatchSize(batchSize int) BlockScannerOption

func WithCandidateScanners

func WithCandidateScanners(candidateScanners []candidates.CandidateScanner) BlockScannerOption

func WithChainID

func WithChainID(chainID flow.ChainID) BlockScannerOption

func WithContext

func WithContext(ctx context.Context) BlockScannerOption

func WithContinuousScan

func WithContinuousScan(continuous bool) BlockScannerOption

func WithLogger

func WithLogger(logger zerolog.Logger) BlockScannerOption

func WithScript

func WithScript(script []byte) BlockScannerOption

func WithScriptResultHandler

func WithScriptResultHandler(handler ScriptResultHandler) BlockScannerOption

func WithStatusReporter

func WithStatusReporter(reporter StatusReporter) BlockScannerOption

type Component

type Component interface {
	Err() error
	Done() <-chan struct{}
	Started() <-chan struct{}
}

type ComponentBase

type ComponentBase struct {
	Logger zerolog.Logger
	// contains filtered or unexported fields
}

func NewComponent

func NewComponent(name string, logger zerolog.Logger) *ComponentBase

func (*ComponentBase) Done

func (c *ComponentBase) Done() <-chan struct{}

func (*ComponentBase) Err

func (c *ComponentBase) Err() error

func (*ComponentBase) Finish

func (c *ComponentBase) Finish(err error)

func (*ComponentBase) Started

func (c *ComponentBase) Started() <-chan struct{}

func (*ComponentBase) StartupDone

func (c *ComponentBase) StartupDone()

type FullScan

type FullScan struct {
	*ComponentBase
	// contains filtered or unexported fields
}

type FullScanRunner

type FullScanRunner struct {
	// contains filtered or unexported fields
}

func NewFullScanRunner

func NewFullScanRunner(
	client client.Client,
	addressBatchChan chan<- AddressBatch,
	batchSize int,
	chainID flow.ChainID,
	reporter StatusReporter,
	logger zerolog.Logger,
) *FullScanRunner

func (*FullScanRunner) StartBatch

func (r *FullScanRunner) StartBatch(
	ctx context.Context,
	blockHeight uint64,
) *FullScan

type IncrementalScanner

type IncrementalScanner struct {
	*ComponentBase
	// contains filtered or unexported fields
}

func NewIncrementalScanner

func NewIncrementalScanner(
	ctx context.Context,
	client client.Client,

	addressBatchChan chan<- AddressBatch,
	requestBatchChan chan<- uint64,

	startAtBlock uint64,
	batchSize int,

	blockCandidateScanners []candidates.CandidateScanner,

	reporter StatusReporter,
	logger zerolog.Logger,

) *IncrementalScanner

func (*IncrementalScanner) LatestHandledBlock

func (r *IncrementalScanner) LatestHandledBlock() uint64

type NoOpScriptResultHandler

type NoOpScriptResultHandler struct{}

func (NoOpScriptResultHandler) Handle

type NoOpStatusReporter

type NoOpStatusReporter struct{}

func (NoOpStatusReporter) ReportFullScanProgress

func (n NoOpStatusReporter) ReportFullScanProgress(uint64, uint64)

func (NoOpStatusReporter) ReportIncrementalBlockDiff

func (n NoOpStatusReporter) ReportIncrementalBlockDiff(uint64)

func (NoOpStatusReporter) ReportIncrementalBlockHeight

func (n NoOpStatusReporter) ReportIncrementalBlockHeight(uint64)

func (NoOpStatusReporter) ReportIsFullScanRunning

func (n NoOpStatusReporter) ReportIsFullScanRunning(bool)

type ProcessedAddressBatch

type ProcessedAddressBatch struct {
	AddressBatch
	Result cadence.Value
}

ProcessedAddressBatch contains the result of running the script on the given batch of addresses.

type ScanConcluded

type ScanConcluded struct {
	LatestScannedBlockHeight uint64
	// ScanIsComplete is false if a full scan was not completed,
	// this means some accounts may have stale data, or have been missed all together.
	ScanIsComplete bool
}

type Scanner added in v0.2.0

type Scanner struct {
	// contains filtered or unexported fields
}

func NewScanner added in v0.2.0

func NewScanner(
	client client.Client,
	options ...BlockScannerOption,
) *Scanner

func (*Scanner) Scan added in v0.2.0

func (scanner *Scanner) Scan() (ScanConcluded, error)

type ScriptResultHandler

type ScriptResultHandler interface {
	// Handle will be called concurrently for each ProcessedAddressBatch.
	Handle(batch ProcessedAddressBatch) error
}

type ScriptResultProcessor

type ScriptResultProcessor struct {
	*ComponentBase
	// contains filtered or unexported fields
}

func NewScriptResultProcessor

func NewScriptResultProcessor(
	ctx context.Context,
	outChan <-chan ProcessedAddressBatch,
	handler ScriptResultHandler,
	logger zerolog.Logger,
) *ScriptResultProcessor

type ScriptRunner

type ScriptRunner struct {
	*ComponentBase
	// contains filtered or unexported fields
}

func NewScriptRunner

func NewScriptRunner(
	ctx context.Context,
	client client.Client,
	scriptCode []byte,
	addressBatchChan <-chan AddressBatch,
	resultsChan chan<- ProcessedAddressBatch,
	logger zerolog.Logger,
) *ScriptRunner

type StatusReporter

type StatusReporter interface {
	ReportIncrementalBlockDiff(diff uint64)
	ReportIncrementalBlockHeight(height uint64)
	ReportIsFullScanRunning(running bool)
	ReportFullScanProgress(current uint64, total uint64)
}

func NewStatusReporter

func NewStatusReporter(
	ctx context.Context,
	prefix string,
	logger zerolog.Logger,
	options ...StatusReporterOption,
) StatusReporter

NewStatusReporter creates a new status reporter that reports the status of the indexer to prometheus. It will start a http server on the given port that exposes the metrics (unless this is disabled for the case where you would want to serve metrics yourself). the prefix is used to prefix all metrics. The status reporter will report: - the incremental block diff (the difference between the last block height handled by the incremental scanner and the current block height) - the incremental block height (the block height last handled by the incremental scanner) - if a full scan is currently running (if it is any data the scanner is tracking is inaccurate) - if a full scan is currently running, the progress of the full scan (from 0 to 1)

type StatusReporterOption added in v0.2.0

type StatusReporterOption = func(*statusReporter)

func WithStartServer added in v0.2.0

func WithStartServer(shouldStartServer bool) StatusReporterOption

func WithStatusReporterPort added in v0.2.0

func WithStatusReporterPort(port int) StatusReporterOption

Directories

Path Synopsis
examples
contract_names command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL