Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrFinishedFetchBlockLoop = errors.New("finished FetchBlockLoop")
Functions ¶
Types ¶
type ErrorState ¶
type ErrorState struct {
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
RPCErrorCount int
DuneErrorCount int
}
func (*ErrorState) ObserveDuneError ¶
func (es *ErrorState) ObserveDuneError(err ErrorInfo)
func (*ErrorState) ObserveRPCError ¶
func (es *ErrorState) ObserveRPCError(err ErrorInfo)
func (*ErrorState) Reset ¶
func (es *ErrorState) Reset()
type Info ¶
type Info struct {
BlockchainName string
Stack string
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
Errors ErrorState
Since time.Time
}
func (Info) ProgressReportErrors ¶
func (info Info) ProgressReportErrors() []models.BlockchainIndexError
ProgressReportErrors returns a combined list of errors from RPC requests and Dune requests
func (*Info) ResetErrors ¶
func (info *Info) ResetErrors()
func (*Info) ToProgressReport ¶
func (info *Info) ToProgressReport() models.BlockchainIndexProgress
type Ingester ¶
type Ingester interface {
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber int64, maxCount int64) error
// ProduceBlockNumbers sends block numbers from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error
// FetchBlockLoop fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error
// SendBlocks consumes RPCBlocks from the channel, reorders them, and sends batches to DuneAPI in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error
// ProduceBlockNumbersDLQ sends block numbers from the DLQ to outChan.
// It will run continuously until the context is cancelled.
// When the DLQ does not return an eligible next block, it waits for PollDLQInterval before trying again
ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.Item[int64]) error
// FetchBlockLoopDLQ fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
FetchBlockLoopDLQ(ctx context.Context,
blockNumbers <-chan dlq.Item[int64],
blocks chan<- dlq.Item[models.RPCBlock],
) error
// SendBlocksDLQ pushes one RPCBlock at a time to DuneAPI in the order they are received in
SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.Item[models.RPCBlock]) error
Close() error
}
func New ¶
func New( log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, duneDLQ duneapi.BlockchainIngester, cfg Config, progress *models.BlockchainIndexProgress, dlq *dlq.DLQ[int64], ) Ingester
Click to show internal directories.
Click to hide internal directories.