blockstream

package
v0.1.0-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BlockSourceRpc = iota
	BlockSourceFile
	BlockSourceLightbringer
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockSource

type BlockSource struct {
	// contains filtered or unexported fields
}

func NewBlockSource

func NewBlockSource(opts *BlockSourceOpts) *BlockSource

func (*BlockSource) BufferDepth

func (bs *BlockSource) BufferDepth() int

func (*BlockSource) DownloadInitialBlocks

func (bs *BlockSource) DownloadInitialBlocks()

DownloadInitialBlocks is kept for backward compatibility but is a no-op The parallel scheduler handles initial prefetch naturally

func (*BlockSource) GetFetchStats

func (bs *BlockSource) GetFetchStats() FetchStatsSnapshot

GetFetchStats returns a snapshot of current fetch statistics

func (*BlockSource) NextBlock

func (bs *BlockSource) NextBlock() *b.Block

func (*BlockSource) NotifyBlockStart

func (bs *BlockSource) NotifyBlockStart(slot uint64)

NotifyBlockStart is called at the START of block execution. In near-tip mode, this triggers fetching N+1 so the RPC latency (~200ms) overlaps with execution time, hiding the wait from the user.

NOTE: We can't use canScheduleMore() here because lastExecutedSlot hasn't been updated yet (we're about to execute, not finished). Instead, we directly check if the next slot is already scheduled/done and schedule it if not.

func (*BlockSource) RefreshTipsForSummary

func (bs *BlockSource) RefreshTipsForSummary()

RefreshTipsForSummary triggers an async refresh of both confirmed and processed tips. Call this near summary time (e.g., at slot 95) so fresh tips are ready at slot 100. This is non-blocking - it spawns a goroutine to do the work.

func (*BlockSource) ResetStats

func (bs *BlockSource) ResetStats()

ResetStats resets the fetch statistics (useful between 100-slot windows)

func (*BlockSource) SetLastExecutedSlot

func (bs *BlockSource) SetLastExecutedSlot(slot uint64)

SetLastExecutedSlot is called by the replay loop after each block is fully executed. This allows accurate tip distance calculation without blocking on replay progress. Also triggers mode switching based on replay progress (not just tip polling).

In near-tip mode, this also: - Schedules N+2 (prefetch while N+1 executes) - Immediately retries N+1 if it failed (don't wait for 200ms ticker)

func (*BlockSource) StallTimeout

func (bs *BlockSource) StallTimeout() time.Duration

StallTimeout returns the configured stall timeout duration.

func (*BlockSource) Stalled

func (bs *BlockSource) Stalled() bool

Stalled returns true if the block source stalled due to persistent fetch failures. When true, the caller should trigger a graceful shutdown to preserve AccountsDB state.

func (*BlockSource) Start

func (bs *BlockSource) Start()

Start begins parallel block fetching

type BlockSourceOpts

type BlockSourceOpts struct {
	RpcClient  *rpcclient.RpcClient // Primary RPC for block fetching (getBlock)
	SourceType BlockSourceType
	StartSlot  uint64
	EndSlot    uint64
	BlockDir   string

	// Backup RPC endpoints for failover (optional)
	// These are tried in order if the primary fails with hard connectivity errors
	// (connection refused, no such host, etc.). NOT used for timeouts or rate limits.
	// After 100 slots, the primary is retried and restored if working.
	BackupRpcEndpoints []string

	// Parallel fetch settings
	MaxRPS          int    // Rate limit (requests per second), 0 = use default
	MaxInflight     int    // Max concurrent workers, 0 = use default
	TipPollMs       int    // Tip poll interval ms, 0 = use default
	TipSafetyMargin uint64 // Don't fetch within N slots of tip, 0 = use default

	// Mode thresholds (hysteresis)
	NearTipThreshold int // Enter near-tip when gap <= this, 0 = use default
	CatchupThreshold int // Exit near-tip when gap >= this, 0 = use default

	// Tip gate: only apply safety margin when gap > this
	CatchupTipGateThreshold int // 0 = use default

	// Near-tip tuning
	NearTipPollMs    int // Faster poll interval in near-tip, 0 = use default
	NearTipLookahead int // Slots ahead to schedule in near-tip, 0 = use default
}

type BlockSourceStats

type BlockSourceStats struct {
	// Fetch counts
	FetchAttempts  atomic.Uint64
	FetchSuccesses atomic.Uint64
	FetchRetries   atomic.Uint64
	FetchSkipped   atomic.Uint64

	// Speculative/backup requests
	SpeculativeRetries atomic.Uint64 // Backup requests sent for slow slots

	// Error buckets
	ErrSlotNotAvail atomic.Uint64
	ErrRateLimited  atomic.Uint64
	ErrBeyondTip    atomic.Uint64
	ErrTransient    atomic.Uint64 // EOF, timeout, 502/503, connection reset, etc.
	ErrOther        atomic.Uint64

	// Latency tracking (nanoseconds)
	TotalFetchLatencyNs atomic.Uint64
	FetchLatencyCount   atomic.Uint64

	// Buffer stats
	MaxBufferedSlot atomic.Uint64
}

BlockSourceStats contains metrics for parallel block fetching

type BlockSourceType

type BlockSourceType int

type FetchStatsSnapshot

type FetchStatsSnapshot struct {
	Attempts           uint64
	Successes          uint64
	Retries            uint64
	Skipped            uint64
	SpeculativeRetries uint64 // Backup requests sent for slow slots
	AvgLatencyMs       float64
	ErrNotAvail        uint64
	ErrRateLimit       uint64
	ErrBeyondTip       uint64
	ErrTransient       uint64 // EOF, timeout, 502/503, connection reset, etc.
	ErrOther           uint64
	BufferDepth        int
	LeadSlots          int64 // MaxBufferedSlot - NextSlotToSend
	NextSlot           uint64
	MaxBuffered        uint64
	ConfirmedTip       uint64
	ProcessedTip       uint64 // Processed commitment tip (super tip)
	TipAtSlot          uint64 // What slot we were emitting when tip was measured
	WorkQueueLen       int
	ReorderBufLen      int
	// Tip poll health
	TipStaleSecs      int64  // Seconds since last successful tip update (0 = healthy)
	TipPollFailures   uint64 // Consecutive tip poll failures
	TotalTipPollFails uint64 // Total tip poll failures this window
	// Mode tracking
	IsNearTip bool // True when in near-tip mode (low-latency, just-in-time scheduling)
	// RPS tracking (for RPC credit usage visibility)
	GetBlockRPS float64 // getBlock calls per second over the stats window
	SuccessRate float64 // Percentage of fetches that returned block data (vs skipped)
	WindowSecs  float64 // How long the stats window has been open
	// Stall diagnostics (for STALL log in replay loop)
	WaitingSlotState   string // "inflight", "done", "pending", "missing"
	WaitingSlotRetries int    // How many times the waiting slot has been retried
	InflightCount      int    // Number of slots currently being fetched
	RetryQueueLen      int    // Number of slots waiting to be retried
}

FetchStatsSnapshot returns a snapshot of fetch stats for logging

type StallDiagnostics

type StallDiagnostics struct {
	// Waiting slot context
	WaitingSlot      uint64
	LastExecutedSlot uint64
	ConfirmedTip     uint64
	Gap              uint64
	Mode             string // "near-tip" or "catchup"
	LastProgressTs   time.Time
	StallElapsed     time.Duration

	// Slot state snapshot
	InflightCount    int
	RetryQueueLen    int
	WorkQueueLen     int
	ReorderBufLen    int
	SkippedSlotsLen  int
	MaxBufferedSlot  uint64
	WaitingSlotState string // "inflight", "done", "pending", "missing"

	// Waiting slot error info
	WaitingSlotErrors *slotErrorInfo

	// RPC health snapshot
	ActiveRpcIdx     int32
	ActiveRpcURL     string
	FailoverCount    uint64
	LastFailoverTime time.Time
	IsOnPrimary      bool

	// Per-RPC error counts (current window)
	ErrSlotNotAvail uint64
	ErrRateLimited  uint64
	ErrBeyondTip    uint64
	ErrTransient    uint64
	ErrHardConn     uint64
	ErrOther        uint64

	// Worker pool stats
	WorkersTotal int
	RateLimitRPS float64
}

StallDiagnostics captures comprehensive state when a stall is detected

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL