Documentation
¶
Index ¶
- Constants
- Variables
- func NewStreamingBundleReader(ctx context.Context, logger *zap.Logger, oneBlockFiles []*bstream.OneBlockFile, ...) (io.Reader, error)
- func Retry(logger *zap.Logger, attempts int, sleep time.Duration, function func() error) (err error)
- type Bundler
- type DStoreIO
- func (s *DStoreIO) DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error
- func (s *DStoreIO) MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, ...) (err error)
- func (s *DStoreIO) NextBundle(ctx context.Context, lowestBaseBlock uint64) (outBaseBlock uint64, lib bstream.BlockRef, err error)
- func (s *DStoreIO) OpenOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (io.ReadCloser, error)
- func (s *DStoreIO) WalkOneBlockFiles(ctx context.Context, lowestBlock uint64, ...) error
- type ForkAwareDStoreIO
- type ForkAwareIOInterface
- type IOInterface
- type Merger
- func (m *Merger) Check(ctx context.Context, in *pbhealth.HealthCheckRequest) (*pbhealth.HealthCheckResponse, error)
- func (m *Merger) List(ctx context.Context, in *pbhealth.HealthListRequest) (*pbhealth.HealthListResponse, error)
- func (m *Merger) Run()
- func (m *Merger) Watch(req *pbhealth.HealthCheckRequest, stream pbhealth.Health_WatchServer) error
- type TestMergerIO
- func (io *TestMergerIO) DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error
- func (io *TestMergerIO) MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, ...) (err error)
- func (io *TestMergerIO) NextBundle(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)
- func (io *TestMergerIO) OpenOneBlockFile(ctx context.Context, obf *bstream.OneBlockFile) (io.ReadCloser, error)
- func (io *TestMergerIO) WalkOneBlockFiles(ctx context.Context, inclusiveLowerBlock uint64, ...) error
Constants ¶
View Source
const ParallelOneBlockDownload = 2
Variables ¶
View Source
var DefaultFilesDeleteBatchSize = 10000
View Source
var DeleteObjectTimeout = 5 * time.Minute
View Source
var ErrFirstBlockAfterInitialStreamableBlock = errors.New("received first block after inital streamable block")
View Source
var ErrHoleFound = errors.New("hole found in merged files")
View Source
var ErrStopBlockReached = errors.New("stop block reached")
View Source
var GetObjectTimeout = 5 * time.Minute
View Source
var ListFilesTimeout = 10 * time.Minute
View Source
var WriteObjectTimeout = 5 * time.Minute
Functions ¶
func NewStreamingBundleReader ¶ added in v1.14.0
func NewStreamingBundleReader(ctx context.Context, logger *zap.Logger, oneBlockFiles []*bstream.OneBlockFile, anyOneBlockFile *bstream.OneBlockFile, opener func(context.Context, *bstream.OneBlockFile) (io.ReadCloser, error)) (io.Reader, error)
NewStreamingBundleReader creates an io.Reader that streams one-block-files directly from storage without loading them into memory. It opens each file via opener one at a time, strips DBIN headers from all files except the first, and pipes the concatenated output to the returned reader.
Types ¶
type Bundler ¶
func NewBundler ¶
func NewBundler(startBlock, stopBlock, firstStreamableBlock, bundleSize uint64, io IOInterface, maxMergingThreads int, shutDownFunc func(error)) *Bundler
func (*Bundler) HandleBlockFile ¶
func (b *Bundler) HandleBlockFile(obf *bstream.OneBlockFile) error
func (*Bundler) ProcessBlock ¶
func (*Bundler) WaitForMerges ¶ added in v1.14.0
func (b *Bundler) WaitForMerges()
WaitForMerges blocks until all in-flight async merges have completed.
type DStoreIO ¶
type DStoreIO struct {
// contains filtered or unexported fields
}
func (*DStoreIO) DeleteAsync ¶
func (s *DStoreIO) DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error
func (*DStoreIO) MergeAndStore ¶
func (*DStoreIO) NextBundle ¶
func (*DStoreIO) OpenOneBlockFile ¶ added in v1.14.0
func (s *DStoreIO) OpenOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (io.ReadCloser, error)
func (*DStoreIO) WalkOneBlockFiles ¶
type ForkAwareDStoreIO ¶
type ForkAwareDStoreIO struct {
*DStoreIO
// contains filtered or unexported fields
}
func (*ForkAwareDStoreIO) DeleteForkedBlocksAsync ¶
func (s *ForkAwareDStoreIO) DeleteForkedBlocksAsync(inclusiveLowBoundary, inclusiveHighBoundary uint64)
func (*ForkAwareDStoreIO) MoveForkedBlocks ¶
func (s *ForkAwareDStoreIO) MoveForkedBlocks(ctx context.Context, oneBlockFiles []*bstream.OneBlockFile)
type ForkAwareIOInterface ¶
type ForkAwareIOInterface interface {
// DeleteForkedBlocksAsync will delete forked blocks between lowBoundary and highBoundary (both inclusive)
DeleteForkedBlocksAsync(inclusiveLowBoundary, inclusiveHighBoundary uint64)
// MoveForkedBlocks will copy an array of oneBlockFiles to the forkedBlocksStore, then delete them (dstore does not have MOVE primitive)
MoveForkedBlocks(ctx context.Context, oneBlockFiles []*bstream.OneBlockFile)
}
type IOInterface ¶
type IOInterface interface {
// NextBundle will read through consecutive merged blocks, starting at `lowestBaseBlock`, and return the next bundle that needs to be created
// If it finds an existing merged file at `lowestBaseBlock`, it will read the last one and include the lastIrreversibleBlock so you can bootstrap your forkdb from there
NextBundle(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)
// WalkOneBlockFiles calls your function for each oneBlockFile it reads, starting at the inclusiveLowerBlock. Useful to feed a block source
WalkOneBlockFiles(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error
// MergeAndStore writes a merged file from a list of oneBlockFiles
MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)
// DeleteAsync should be able to delete large quantities of oneBlockFiles from storage without ever blocking
DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error
// OpenOneBlockFile opens a one-block-file for reading
OpenOneBlockFile(ctx context.Context, obf *bstream.OneBlockFile) (io.ReadCloser, error)
}
type Merger ¶
func (*Merger) Check ¶
func (m *Merger) Check(ctx context.Context, in *pbhealth.HealthCheckRequest) (*pbhealth.HealthCheckResponse, error)
Check is basic GRPC Healthcheck
func (*Merger) List ¶ added in v1.9.9
func (m *Merger) List(ctx context.Context, in *pbhealth.HealthListRequest) (*pbhealth.HealthListResponse, error)
func (*Merger) Watch ¶
func (m *Merger) Watch(req *pbhealth.HealthCheckRequest, stream pbhealth.Health_WatchServer) error
Watch is basic GRPC Healthcheck as a stream
type TestMergerIO ¶
type TestMergerIO struct {
NextBundleFunc func(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)
WalkOneBlockFilesFunc func(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error
MergeAndStoreFunc func(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)
DeleteAsyncFunc func(oneBlockFiles []*bstream.OneBlockFile) error
OpenOneBlockFileFunc func(ctx context.Context, obf *bstream.OneBlockFile) (io.ReadCloser, error)
}
func (*TestMergerIO) DeleteAsync ¶
func (io *TestMergerIO) DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error
func (*TestMergerIO) MergeAndStore ¶
func (io *TestMergerIO) MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)
func (*TestMergerIO) NextBundle ¶
func (*TestMergerIO) OpenOneBlockFile ¶ added in v1.14.0
func (io *TestMergerIO) OpenOneBlockFile(ctx context.Context, obf *bstream.OneBlockFile) (io.ReadCloser, error)
func (*TestMergerIO) WalkOneBlockFiles ¶
func (io *TestMergerIO) WalkOneBlockFiles(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error
Source Files
¶
Click to show internal directories.
Click to hide internal directories.