Documentation
¶
Index ¶
- Constants
- Variables
- func NewMockStateConsumer() *mockStateConsumer
- func NewMockStateProvider() *mockStateProvider
- type Core
- type CoreFactory
- type CoreImpl
- type Criteria
- type ExecutionResultQueryProvider
- type ExecutionStateCache
- type Pipeline
- type PipelineFactory
- type PipelineImpl
- type PipelineStateConsumer
- type PipelineStateProvider
- type Query
- type Snapshot
- type State
Constants ¶
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second
DefaultTxResultErrMsgsRequestTimeout is the default timeout for requesting transaction result error messages.
Variables ¶
var ( // ErrInvalidTransition is returned when a state transition is invalid. ErrInvalidTransition = errors.New("invalid state transition") )
Functions ¶
func NewMockStateConsumer ¶
func NewMockStateConsumer() *mockStateConsumer
NewMockStateConsumer creates a new instance of mockStateConsumer with a buffered channel.
func NewMockStateProvider ¶
func NewMockStateProvider() *mockStateProvider
NewMockStateProvider initializes a mockStateProvider with the default state StatePending.
Types ¶
type Core ¶
type Core interface { // Download retrieves all necessary data for processing from the network. // Download will block until the data is successfully downloaded, and has not internal timeout. // When Aboandon is called, the caller must cancel the context passed in to shutdown the operation // otherwise it may block indefinitely. // // Expected error returns during normal operation: // - [context.Canceled]: if the provided context was canceled before completion Download(ctx context.Context) error // Index processes the downloaded data and stores it into in-memory indexes. // Must be called after Download. // // No error returns are expected during normal operations Index() error // Persist stores the indexed data in permanent storage. // Must be called after Index. // // No error returns are expected during normal operations Persist() error // Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted // and any data dropped. // This method will block until other in-progress operations are complete. If Download is in progress, // the caller should cancel its context to ensure the operation completes in a timely manner. Abandon() }
<component_spec> Core defines the interface for pipelined execution result processing. There are 3 main steps which must be completed sequentially and exactly once. 1. Download the BlockExecutionData and TransactionResultErrorMessages for the execution result. 2. Index the downloaded data into mempools. 3. Persist the indexed data to into persisted storage.
If the protocol abandons the execution result, Abandon() is called to signal to the Core instance that processing will stop and any data accumulated may be discarded. Abandon() may be called at any time, but may block until in-progress operations are complete. </component_spec>
All exported methods are safe for concurrent use.
type CoreFactory ¶
type CoreFactory interface {
NewCore(result *flow.ExecutionResult) Core
}
CoreFactory is a factory object for creating new Core instances.
type CoreImpl ¶
type CoreImpl struct {
// contains filtered or unexported fields
}
CoreImpl implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data.
Safe for concurrent use.
func NewCoreImpl ¶
func NewCoreImpl( logger zerolog.Logger, executionResult *flow.ExecutionResult, block *flow.Block, execDataRequester requester.ExecutionDataRequester, txResultErrMsgsRequester tx_error_messages.Requester, txResultErrMsgsRequestTimeout time.Duration, persistentRegisters storage.RegisterIndex, persistentEvents storage.Events, persistentCollections storage.Collections, persistentResults storage.LightTransactionResults, persistentTxResultErrMsg storage.TransactionResultErrorMessages, latestPersistedSealedResult storage.LatestPersistedSealedResult, protocolDB storage.DB, lockManager storage.LockManager, ) (*CoreImpl, error)
NewCoreImpl creates a new CoreImpl with all necessary dependencies Safe for concurrent use.
No error returns are expected during normal operations
func (*CoreImpl) Abandon ¶
func (c *CoreImpl) Abandon()
Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. This method will block until other in-progress operations are complete. If Download is in progress, the caller should cancel its context to ensure the operation completes in a timely manner.
The method is idempotent. Calling it multiple times has no effect.
func (*CoreImpl) Download ¶
Download retrieves all necessary data for processing from the network. Download will block until the data is successfully downloaded, and has not internal timeout. When Aboandon is called, the caller must cancel the context passed in to shutdown the operation otherwise it may block indefinitely.
The method may only be called once. Calling it multiple times will return an error. Calling Download after Abandon is called will return an error.
Expected error returns during normal operation: - context.Canceled: if the provided context was canceled before completion
func (*CoreImpl) Index ¶
Index processes the downloaded data and stores it into in-memory indexes. Must be called after Download.
The method may only be called once. Calling it multiple times will return an error. Calling Index after Abandon is called will return an error.
No error returns are expected during normal operations
func (*CoreImpl) Persist ¶
Persist stores the indexed data in permanent storage. Must be called after Index.
The method may only be called once. Calling it multiple times will return an error. Calling Persist after Abandon is called will return an error.
No error returns are expected during normal operations
type Criteria ¶
type Criteria struct { // AgreeingExecutorsCount is the number of receipts including the same ExecutionResult AgreeingExecutorsCount uint // RequiredExecutors is the list of EN node IDs, one of which must have produced the result RequiredExecutors flow.IdentifierList }
Criteria defines the filtering criteria for execution result queries. It specifies requirements for execution result selection including the number of agreeing executors and requires executor nodes.
func (*Criteria) OverrideWith ¶
OverrideWith overrides the original criteria with the incoming criteria, returning a new Criteria object. Fields from `override` criteria take precedence when set.
type ExecutionResultQueryProvider ¶
type ExecutionResultQueryProvider interface { // ExecutionResultQuery retrieves execution results and associated execution nodes for a given block ID // based on the provided criteria. It returns a Query containing the execution result and // the execution nodes that produced it. // // Expected errors during normal operations: // - backend.InsufficientExecutionReceipts - found insufficient receipts for given block ID. ExecutionResultQuery(blockID flow.Identifier, criteria Criteria) (*Query, error) }
ExecutionResultQueryProvider provides execution results and execution nodes based on criteria. It allows querying for execution results by block ID with specific filtering criteria to ensure consistency and reliability of execution results.
type ExecutionStateCache ¶
type ExecutionStateCache interface { // Snapshot returns a view of the execution state as of the provided ExecutionResult. // The returned Snapshot provides access to execution state data for the fork ending // on the provided ExecutionResult which extends from the latest sealed result. // The result may be sealed or unsealed. Only data for finalized blocks is available. // // Expected errors during normal operation: // - storage.ErrNotFound - result is not available, not ready for querying, or does not descend from the latest sealed result. // - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible) Snapshot(executionResultID flow.Identifier) (Snapshot, error) }
ExecutionStateCache provides access to execution state snapshots for querying data at specific ExecutionResults.
type Pipeline ¶
type Pipeline interface { PipelineStateProvider // Run starts the pipeline processing and blocks until completion or context cancellation. // CAUTION: not concurrency safe! Run must only be called once. // // Expected Errors: // - context.Canceled: when the context is canceled // - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible) Run(ctx context.Context, core Core, parentState State) error // SetSealed marks the pipeline's result as sealed, which enables transitioning from StateWaitingPersist to StatePersisting. SetSealed() // OnParentStateUpdated updates the pipeline's parent's state. OnParentStateUpdated(parentState State) // Abandon marks the pipeline as abandoned. Abandon() }
Pipeline represents a processing pipelined state machine for a single ExecutionResult. The state machine is initialized in the Pending state.
The state machine is designed to be run in a single goroutine. The Run method must only be called once.
type PipelineFactory ¶
type PipelineFactory interface {
NewPipeline(result *flow.ExecutionResult, isSealed bool) Pipeline
}
PipelineFactory is a factory object for creating new Pipeline instances.
type PipelineImpl ¶
type PipelineImpl struct {
// contains filtered or unexported fields
}
PipelineImpl implements the Pipeline interface
func NewPipeline ¶
func NewPipeline( log zerolog.Logger, executionResult *flow.ExecutionResult, isSealed bool, stateReceiver PipelineStateConsumer, ) *PipelineImpl
NewPipeline creates a new processing pipeline. The pipeline is initialized in the Pending state.
func (*PipelineImpl) Abandon ¶
func (p *PipelineImpl) Abandon()
Abandon marks the pipeline as abandoned This will cause the pipeline to eventually transition to the Abandoned state and halt processing
func (*PipelineImpl) GetState ¶
func (p *PipelineImpl) GetState() State
GetState returns the current state of the pipeline.
func (*PipelineImpl) OnParentStateUpdated ¶
func (p *PipelineImpl) OnParentStateUpdated(parentState State)
OnParentStateUpdated updates the pipeline's state based on the provided parent state. If the parent state has changed, it will notify the state consumer and trigger a state change notification.
func (*PipelineImpl) Run ¶
Run starts the pipeline processing and blocks until completion or context cancellation. CAUTION: not concurrency safe! Run must only be called once.
Expected Errors:
- context.Canceled: when the context is canceled
- All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
func (*PipelineImpl) SetSealed ¶
func (p *PipelineImpl) SetSealed()
SetSealed marks the execution result as sealed. This will cause the pipeline to eventually transition to the StateComplete state when the parent finishes processing.
type PipelineStateConsumer ¶
type PipelineStateConsumer interface { // OnStateUpdated is called when a pipeline's state changes to notify the receiver of the new state. // This method is will be called in the same goroutine that runs the pipeline, so it must not block. OnStateUpdated(newState State) }
PipelineStateConsumer is a receiver of the pipeline state updates. PipelineStateConsumer implementations must be - NON-BLOCKING and consume the state updates without noteworthy delay
type PipelineStateProvider ¶
type PipelineStateProvider interface { // GetState returns the current state of the pipeline. GetState() State }
PipelineStateProvider is an interface that provides a pipeline's state.
type Query ¶
type Query struct { // ExecutionResult is the execution result for the queried block ExecutionResult *flow.ExecutionResult // ExecutionNodes is the list of execution node identities that produced the result ExecutionNodes flow.IdentitySkeletonList }
Query contains the result of an execution result query. It includes both the execution result and the execution nodes that produced it.
type Snapshot ¶
type Snapshot interface { // Events returns a reader for querying event data. Events() storage.EventsReader // Collections returns a reader for querying collection data. Collections() storage.CollectionsReader // Transactions returns a reader for querying transaction data. Transactions() storage.TransactionsReader // LightTransactionResults returns a reader for querying light transaction result data. LightTransactionResults() storage.LightTransactionResultsReader // TransactionResultErrorMessages returns a reader for querying transaction error message data. TransactionResultErrorMessages() storage.TransactionResultErrorMessagesReader // Registers returns a reader for querying register data. Registers() storage.RegisterIndexReader }
Snapshot provides access to execution data readers for querying various data types from a specific ExecutionResult.
type State ¶
type State int32
State represents the state of the processing pipeline
const ( // StatePending is the initial state after instantiation, before Run is called StatePending State = iota // StateProcessing represents the state where data processing (download and indexing) has been started StateProcessing // StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met StateWaitingPersist // StateComplete represents the state where all data is persisted to storage StateComplete // StateAbandoned represents the state where processing was aborted StateAbandoned )
func (State) IsTerminal ¶
IsTerminal returns true if the state is a terminal state (Complete or Abandoned).