merger

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

README

StreamingFast Merger

reference License

The merger process is responsible for accumulating blocks from all forks visible by the pool of instrumented nodes, and builds the famous 100-blocks files consumed by bstream's FileSource and may other StreamingFast processes.

Design

The Merger section of the official Firehose documentation provides additional information on its design details.

https://firehose.streamingfast.io/concepts-and-architeceture/components#merger

Contributing

Issues and PR in this repo related strictly to the merger functionalities

Report any protocol-specific issues in their respective repositories

Please first refer to the general streamingfast contribution guide, if you wish to contribute to this code base.

License

Apache 2.0

Documentation

Index

Constants

View Source
const ParallelOneBlockDownload = 2

Variables

View Source
var DefaultFilesDeleteBatchSize = 10000
View Source
var DefaultFilesDeleteThreads = 8
View Source
var DeleteObjectTimeout = 5 * time.Minute
View Source
var ErrFirstBlockAfterInitialStreamableBlock = errors.New("received first block after inital streamable block")
View Source
var ErrHoleFound = errors.New("hole found in merged files")
View Source
var ErrStopBlockReached = errors.New("stop block reached")
View Source
var GetObjectTimeout = 5 * time.Minute
View Source
var ListFilesTimeout = 10 * time.Minute
View Source
var WriteObjectTimeout = 5 * time.Minute

Functions

func Retry

func Retry(logger *zap.Logger, attempts int, sleep time.Duration, function func() error) (err error)

Types

type BundleReader

type BundleReader struct {
	// contains filtered or unexported fields
}

func NewBundleReader

func NewBundleReader(ctx context.Context, logger *zap.Logger, tracer logging.Tracer, oneBlockFiles []*bstream.OneBlockFile, anyOneBlockFile *bstream.OneBlockFile, oneBlockDownloader bstream.OneBlockDownloaderFunc) (*BundleReader, error)

func (*BundleReader) Read

func (r *BundleReader) Read(p []byte) (bytesRead int, err error)

type Bundler

type Bundler struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewBundler

func NewBundler(startBlock, stopBlock, firstStreamableBlock, bundleSize uint64, io IOInterface) *Bundler

func (*Bundler) BaseBlockNum

func (b *Bundler) BaseBlockNum() uint64

BaseBlockNum can be called from a different thread

func (*Bundler) HandleBlockFile

func (b *Bundler) HandleBlockFile(obf *bstream.OneBlockFile) error

func (*Bundler) ProcessBlock

func (b *Bundler) ProcessBlock(_ *pbbstream.Block, obj interface{}) error

func (*Bundler) Reset

func (b *Bundler) Reset(nextBase uint64, lib bstream.BlockRef)

func (*Bundler) String

func (b *Bundler) String() string

String can be called from a different thread

type DStoreIO

type DStoreIO struct {
	// contains filtered or unexported fields
}

func (*DStoreIO) DeleteAsync

func (s *DStoreIO) DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error

func (*DStoreIO) DownloadOneBlockFile

func (s *DStoreIO) DownloadOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error)

func (*DStoreIO) MergeAndStore

func (s *DStoreIO) MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)

func (*DStoreIO) NextBundle

func (s *DStoreIO) NextBundle(ctx context.Context, lowestBaseBlock uint64) (outBaseBlock uint64, lib bstream.BlockRef, err error)

func (*DStoreIO) WalkOneBlockFiles

func (s *DStoreIO) WalkOneBlockFiles(ctx context.Context, lowestBlock uint64, callback func(*bstream.OneBlockFile) error) error

type ForkAwareDStoreIO

type ForkAwareDStoreIO struct {
	*DStoreIO
	// contains filtered or unexported fields
}

func (*ForkAwareDStoreIO) DeleteForkedBlocksAsync

func (s *ForkAwareDStoreIO) DeleteForkedBlocksAsync(inclusiveLowBoundary, inclusiveHighBoundary uint64)

func (*ForkAwareDStoreIO) MoveForkedBlocks

func (s *ForkAwareDStoreIO) MoveForkedBlocks(ctx context.Context, oneBlockFiles []*bstream.OneBlockFile)

type ForkAwareIOInterface

type ForkAwareIOInterface interface {
	// DeleteForkedBlocksAsync will delete forked blocks between lowBoundary and highBoundary (both inclusive)
	DeleteForkedBlocksAsync(inclusiveLowBoundary, inclusiveHighBoundary uint64)

	// MoveForkedBlocks will copy an array of oneBlockFiles to the forkedBlocksStore, then delete them (dstore does not have MOVE primitive)
	MoveForkedBlocks(ctx context.Context, oneBlockFiles []*bstream.OneBlockFile)
}

type IOInterface

type IOInterface interface {

	// NextBundle will read through consecutive merged blocks, starting at `lowestBaseBlock`, and return the next bundle that needs to be created
	// If it finds an existing merged file at `lowestBaseBlock`, it will read the last one and include the lastIrreversibleBlock so you can bootstrap your forkdb from there
	NextBundle(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)

	// WalkOneBlockFiles calls your function for each oneBlockFile it reads, starting at the inclusiveLowerBlock. Useful to feed a block source
	WalkOneBlockFiles(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error

	// MergeAndStore writes a merged file from a list of oneBlockFiles
	MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)

	// DownloadOneBlockFile will get you the data from the file
	DownloadOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error)

	// DeleteAsync should be able to delete large quantities of oneBlockFiles from storage without ever blocking
	DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error
}

func NewDStoreIO

func NewDStoreIO(
	logger *zap.Logger,
	tracer logging.Tracer,
	oneBlocksStore dstore.Store,
	mergedBlocksStore dstore.Store,
	forkedBlocksStore dstore.Store,
	retryAttempts int,
	retryCooldown time.Duration,
	bundleSize uint64,
) IOInterface

type Merger

type Merger struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewMerger

func NewMerger(
	logger *zap.Logger,
	grpcListenAddr string,
	io IOInterface,

	firstStreamableBlock uint64,
	bundleSize uint64,
	pruningDistanceToLIB uint64,
	timeBetweenPruning time.Duration,
	timeBetweenPolling time.Duration,
	stopBlock uint64,
) *Merger

func (*Merger) Check

Check is basic GRPC Healthcheck

func (*Merger) Run

func (m *Merger) Run()

func (*Merger) Watch

Watch is basic GRPC Healthcheck as a stream

type TestMergerIO

type TestMergerIO struct {
	NextBundleFunc           func(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)
	WalkOneBlockFilesFunc    func(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error
	MergeAndStoreFunc        func(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)
	DownloadOneBlockFileFunc func(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error)
	DeleteAsyncFunc          func(oneBlockFiles []*bstream.OneBlockFile) error
}

func (*TestMergerIO) DeleteAsync

func (io *TestMergerIO) DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error

func (*TestMergerIO) DownloadOneBlockFile

func (io *TestMergerIO) DownloadOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error)

func (*TestMergerIO) MergeAndStore

func (io *TestMergerIO) MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)

func (*TestMergerIO) NextBundle

func (io *TestMergerIO) NextBundle(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)

func (*TestMergerIO) WalkOneBlockFiles

func (io *TestMergerIO) WalkOneBlockFiles(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error

Directories

Path Synopsis
app

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL