Documentation
¶
Index ¶
- Constants
- type BlockSource
- func (bs *BlockSource) BufferDepth() int
- func (bs *BlockSource) DownloadInitialBlocks()
- func (bs *BlockSource) GetFetchStats() FetchStatsSnapshot
- func (bs *BlockSource) NextBlock() *b.Block
- func (bs *BlockSource) NotifyBlockStart(slot uint64)
- func (bs *BlockSource) RefreshTipsForSummary()
- func (bs *BlockSource) ResetStats()
- func (bs *BlockSource) SetLastExecutedSlot(slot uint64)
- func (bs *BlockSource) StallTimeout() time.Duration
- func (bs *BlockSource) Stalled() bool
- func (bs *BlockSource) Start()
- type BlockSourceOpts
- type BlockSourceStats
- type BlockSourceType
- type FetchStatsSnapshot
- type StallDiagnostics
Constants ¶
const ( BlockSourceRpc = iota BlockSourceFile BlockSourceLightbringer )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BlockSource ¶
type BlockSource struct {
// contains filtered or unexported fields
}
func NewBlockSource ¶
func NewBlockSource(opts *BlockSourceOpts) *BlockSource
func (*BlockSource) BufferDepth ¶
func (bs *BlockSource) BufferDepth() int
func (*BlockSource) DownloadInitialBlocks ¶
func (bs *BlockSource) DownloadInitialBlocks()
DownloadInitialBlocks is kept for backward compatibility but is a no-op The parallel scheduler handles initial prefetch naturally
func (*BlockSource) GetFetchStats ¶
func (bs *BlockSource) GetFetchStats() FetchStatsSnapshot
GetFetchStats returns a snapshot of current fetch statistics
func (*BlockSource) NextBlock ¶
func (bs *BlockSource) NextBlock() *b.Block
func (*BlockSource) NotifyBlockStart ¶
func (bs *BlockSource) NotifyBlockStart(slot uint64)
NotifyBlockStart is called at the START of block execution. In near-tip mode, this triggers fetching N+1 so the RPC latency (~200ms) overlaps with execution time, hiding the wait from the user.
NOTE: We can't use canScheduleMore() here because lastExecutedSlot hasn't been updated yet (we're about to execute, not finished). Instead, we directly check if the next slot is already scheduled/done and schedule it if not.
func (*BlockSource) RefreshTipsForSummary ¶
func (bs *BlockSource) RefreshTipsForSummary()
RefreshTipsForSummary triggers an async refresh of both confirmed and processed tips. Call this near summary time (e.g., at slot 95) so fresh tips are ready at slot 100. This is non-blocking - it spawns a goroutine to do the work.
func (*BlockSource) ResetStats ¶
func (bs *BlockSource) ResetStats()
ResetStats resets the fetch statistics (useful between 100-slot windows)
func (*BlockSource) SetLastExecutedSlot ¶
func (bs *BlockSource) SetLastExecutedSlot(slot uint64)
SetLastExecutedSlot is called by the replay loop after each block is fully executed. This allows accurate tip distance calculation without blocking on replay progress. Also triggers mode switching based on replay progress (not just tip polling).
In near-tip mode, this also: - Schedules N+2 (prefetch while N+1 executes) - Immediately retries N+1 if it failed (don't wait for 200ms ticker)
func (*BlockSource) StallTimeout ¶
func (bs *BlockSource) StallTimeout() time.Duration
StallTimeout returns the configured stall timeout duration.
func (*BlockSource) Stalled ¶
func (bs *BlockSource) Stalled() bool
Stalled returns true if the block source stalled due to persistent fetch failures. When true, the caller should trigger a graceful shutdown to preserve AccountsDB state.
type BlockSourceOpts ¶
type BlockSourceOpts struct {
RpcClient *rpcclient.RpcClient // Primary RPC for block fetching (getBlock)
SourceType BlockSourceType
StartSlot uint64
EndSlot uint64
BlockDir string
// Backup RPC endpoints for failover (optional)
// These are tried in order if the primary fails with hard connectivity errors
// (connection refused, no such host, etc.). NOT used for timeouts or rate limits.
// After 100 slots, the primary is retried and restored if working.
BackupRpcEndpoints []string
// Parallel fetch settings
MaxRPS int // Rate limit (requests per second), 0 = use default
MaxInflight int // Max concurrent workers, 0 = use default
TipPollMs int // Tip poll interval ms, 0 = use default
TipSafetyMargin uint64 // Don't fetch within N slots of tip, 0 = use default
// Mode thresholds (hysteresis)
NearTipThreshold int // Enter near-tip when gap <= this, 0 = use default
CatchupThreshold int // Exit near-tip when gap >= this, 0 = use default
// Tip gate: only apply safety margin when gap > this
CatchupTipGateThreshold int // 0 = use default
// Near-tip tuning
NearTipPollMs int // Faster poll interval in near-tip, 0 = use default
NearTipLookahead int // Slots ahead to schedule in near-tip, 0 = use default
}
type BlockSourceStats ¶
type BlockSourceStats struct {
// Fetch counts
FetchAttempts atomic.Uint64
FetchSuccesses atomic.Uint64
FetchRetries atomic.Uint64
FetchSkipped atomic.Uint64
// Speculative/backup requests
SpeculativeRetries atomic.Uint64 // Backup requests sent for slow slots
// Error buckets
ErrSlotNotAvail atomic.Uint64
ErrRateLimited atomic.Uint64
ErrBeyondTip atomic.Uint64
ErrTransient atomic.Uint64 // EOF, timeout, 502/503, connection reset, etc.
ErrOther atomic.Uint64
// Latency tracking (nanoseconds)
TotalFetchLatencyNs atomic.Uint64
FetchLatencyCount atomic.Uint64
// Buffer stats
MaxBufferedSlot atomic.Uint64
}
BlockSourceStats contains metrics for parallel block fetching
type BlockSourceType ¶
type BlockSourceType int
type FetchStatsSnapshot ¶
type FetchStatsSnapshot struct {
Attempts uint64
Successes uint64
Retries uint64
Skipped uint64
SpeculativeRetries uint64 // Backup requests sent for slow slots
AvgLatencyMs float64
ErrNotAvail uint64
ErrRateLimit uint64
ErrBeyondTip uint64
ErrTransient uint64 // EOF, timeout, 502/503, connection reset, etc.
ErrOther uint64
BufferDepth int
LeadSlots int64 // MaxBufferedSlot - NextSlotToSend
NextSlot uint64
MaxBuffered uint64
ConfirmedTip uint64
ProcessedTip uint64 // Processed commitment tip (super tip)
TipAtSlot uint64 // What slot we were emitting when tip was measured
WorkQueueLen int
ReorderBufLen int
// Tip poll health
TipStaleSecs int64 // Seconds since last successful tip update (0 = healthy)
TipPollFailures uint64 // Consecutive tip poll failures
TotalTipPollFails uint64 // Total tip poll failures this window
// Mode tracking
IsNearTip bool // True when in near-tip mode (low-latency, just-in-time scheduling)
// RPS tracking (for RPC credit usage visibility)
GetBlockRPS float64 // getBlock calls per second over the stats window
SuccessRate float64 // Percentage of fetches that returned block data (vs skipped)
WindowSecs float64 // How long the stats window has been open
// Stall diagnostics (for STALL log in replay loop)
WaitingSlotState string // "inflight", "done", "pending", "missing"
WaitingSlotRetries int // How many times the waiting slot has been retried
InflightCount int // Number of slots currently being fetched
RetryQueueLen int // Number of slots waiting to be retried
}
FetchStatsSnapshot returns a snapshot of fetch stats for logging
type StallDiagnostics ¶
type StallDiagnostics struct {
// Waiting slot context
WaitingSlot uint64
LastExecutedSlot uint64
ConfirmedTip uint64
Gap uint64
Mode string // "near-tip" or "catchup"
LastProgressTs time.Time
StallElapsed time.Duration
// Slot state snapshot
InflightCount int
RetryQueueLen int
WorkQueueLen int
ReorderBufLen int
SkippedSlotsLen int
MaxBufferedSlot uint64
WaitingSlotState string // "inflight", "done", "pending", "missing"
// Waiting slot error info
WaitingSlotErrors *slotErrorInfo
// RPC health snapshot
ActiveRpcIdx int32
ActiveRpcURL string
FailoverCount uint64
LastFailoverTime time.Time
IsOnPrimary bool
// Per-RPC error counts (current window)
ErrSlotNotAvail uint64
ErrRateLimited uint64
ErrBeyondTip uint64
ErrTransient uint64
ErrHardConn uint64
ErrOther uint64
// Worker pool stats
WorkersTotal int
RateLimitRPS float64
}
StallDiagnostics captures comprehensive state when a stall is detected