Documentation
¶
Index ¶
Constants ¶
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Core ¶
type Core interface { // Download retrieves all necessary data for processing. // CAUTION: not concurrency safe! // // Expected errors: // - context.Canceled: if the provided context was canceled before completion // - context.DeadlineExceeded: if the provided context was canceled due to its deadline reached // - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible) Download(ctx context.Context) error // Index processes the downloaded data and creates in-memory indexes. // CAUTION: not concurrency safe! // // No errors are expected during normal operations Index() error // Persist stores the indexed data in permanent storage. // CAUTION: not concurrency safe! // // No errors are expected during normal operations Persist() error // Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted // and any data dropped. // CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data. // CAUTION: not concurrency safe! // // No errors are expected during normal operations Abandon() error }
Core defines the interface for pipeline processing steps. Each implementation should handle an execution data and implement the three-phase processing: download, index, and persist. CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data.
type CoreImpl ¶
type CoreImpl struct {
// contains filtered or unexported fields
}
CoreImpl implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data. CAUTION: not concurrency safe!
func NewCoreImpl ¶
func NewCoreImpl( logger zerolog.Logger, executionResult *flow.ExecutionResult, header *flow.Header, execDataRequester requester.ExecutionDataRequester, txResultErrMsgsRequester tx_error_messages.Requester, txResultErrMsgsRequestTimeout time.Duration, persistentRegisters storage.RegisterIndex, persistentEvents storage.Events, persistentCollections storage.Collections, persistentTransactions storage.Transactions, persistentResults storage.LightTransactionResults, persistentTxResultErrMsg storage.TransactionResultErrorMessages, latestPersistedSealedResult storage.LatestPersistedSealedResult, protocolDB storage.DB, ) *CoreImpl
NewCoreImpl creates a new CoreImpl with all necessary dependencies CAUTION: not concurrency safe!
func (*CoreImpl) Abandon ¶
Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data. CAUTION: not concurrency safe!
No errors are expected during normal operations
func (*CoreImpl) Download ¶
Download downloads execution data and transaction results error for the block CAUTION: not concurrency safe!
Expected errors: - context.Canceled: if the provided context was canceled before completion - context.DeadlineExceeded: if the provided context was canceled due to its deadline reached - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline represents a generic processing pipeline with state transitions. It processes data through sequential states: Ready -> Downloading -> Indexing -> WaitingPersist -> Persisting -> Complete, with conditions for each transition.
func NewPipeline ¶
func NewPipeline( logger zerolog.Logger, isSealed bool, executionResult *flow.ExecutionResult, core Core, stateUpdatePublisher StateUpdatePublisher, ) *Pipeline
NewPipeline creates a new processing pipeline. Pipelines must only be created for ExecutionResults that descend from the latest persisted sealed result. The pipeline is initialized in the Ready state.
Parameters:
- logger: the logger to use for the pipeline
- isSealed: indicates if the pipeline's ExecutionResult is sealed
- executionResult: processed execution result
- core: implements the processing logic for the pipeline
- stateUpdatePublisher: called when the pipeline needs to broadcast state updates
Returns:
- new pipeline object
func (*Pipeline) Run ¶
Run starts the pipeline processing and blocks until completion or context cancellation.
This function handles the progression through the pipeline states, executing the appropriate processing functions at each step.
When the pipeline reaches a terminal state (StateComplete or StateCanceled), the function returns. The function will also return if the provided context is canceled.
Returns an error if any processing step fails with an irrecoverable error. Returns nil if processing completes successfully, reaches a terminal state, or if either the parent or pipeline context is canceled.
func (*Pipeline) SetSealed ¶
func (p *Pipeline) SetSealed()
SetSealed marks the data as sealed, which enables transitioning from StateWaitingPersist to StatePersisting.
func (*Pipeline) UpdateState ¶
func (p *Pipeline) UpdateState(update StateUpdate)
UpdateState updates the pipeline's state based on the provided state update.
type State ¶
type State int
State represents the state of the processing pipeline
const ( // StateReady is the initial state after instantiation and before downloading has begun StateReady State = iota // StateDownloading represents the state where data download is in progress StateDownloading // StateIndexing represents the state where data is being indexed StateIndexing // StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met StateWaitingPersist // StatePersisting represents the state where the indexed data is being persisted to storage StatePersisting // StateComplete represents the state where all data is persisted to storage StateComplete // StateCanceled represents the state where processing was aborted StateCanceled )
type StateUpdate ¶
type StateUpdate struct { // DescendsFromLastPersistedSealed indicates if this pipeline descends from // the last persisted sealed result DescendsFromLastPersistedSealed bool // ParentState contains the state information from the parent pipeline ParentState State }
StateUpdate contains state update information
type StateUpdatePublisher ¶
type StateUpdatePublisher func(update StateUpdate)
StateUpdatePublisher is a function that publishes state updates