Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidTransition = errors.New("invalid state transition")
ErrInvalidTransition is returned when a state transition is invalid.
var ErrResultAbandoned = fmt.Errorf("result abandoned")
ErrResultAbandoned is returned when calling one of the methods after the result has been abandoned. Not exported because this is not an expected error condition.
Functions ¶
This section is empty.
Types ¶
type Core ¶
type Core interface {
// Download retrieves all necessary data for processing from the network.
// Download will block until the data is successfully downloaded, and has not internal timeout.
// When Aboandon is called, the caller must cancel the context passed in to shutdown the operation
// otherwise it may block indefinitely.
//
// Expected error returns during normal operation:
// - [context.Canceled]: if the provided context was canceled before completion
Download(ctx context.Context) error
// Index processes the downloaded data and stores it into in-memory indexes.
// Must be called after Download.
//
// No error returns are expected during normal operations
Index() error
// Persist stores the indexed data in permanent storage.
// Must be called after Index.
//
// No error returns are expected during normal operations
Persist() error
// Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted
// and any data dropped.
// This method will block until other in-progress operations are complete. If Download is in progress,
// the caller should cancel its context to ensure the operation completes in a timely manner.
Abandon()
}
<component_spec> Core defines the interface for pipelined execution result processing. There are 3 main steps which must be completed sequentially and exactly once. 1. Download the BlockExecutionData and TransactionResultErrorMessages for the execution result. 2. Index the downloaded data into mempools. 3. Persist the indexed data to into persisted storage.
If the protocol abandons the execution result, Abandon() is called to signal to the Core instance that processing will stop and any data accumulated may be discarded. Abandon() may be called at any time, but may block until in-progress operations are complete. </component_spec>
All exported methods are safe for concurrent use.
type CoreFactory ¶
type CoreFactory interface {
NewCore(result *flow.ExecutionResult) Core
}
CoreFactory is a factory object for creating new Core instances.
type Criteria ¶
type Criteria struct {
// AgreeingExecutorsCount is the number of receipts including the same ExecutionResult
AgreeingExecutorsCount uint
// RequiredExecutors is the list of EN node IDs, one of which must have produced the result
RequiredExecutors flow.IdentifierList
}
Criteria defines the filtering criteria for execution result queries. It specifies requirements for execution result selection including the number of agreeing executors and requires executor nodes.
func (*Criteria) OverrideWith ¶
OverrideWith overrides the original criteria with the incoming criteria, returning a new Criteria object. Fields from `override` criteria take precedence when set.
type ExecutionResultQueryProvider ¶
type ExecutionResultQueryProvider interface {
// ExecutionResultQuery retrieves execution results and associated execution nodes for a given block ID
// based on the provided criteria. It returns a Query containing the execution result and
// the execution nodes that produced it.
//
// Expected errors during normal operations:
// - backend.InsufficientExecutionReceipts - found insufficient receipts for given block ID.
ExecutionResultQuery(blockID flow.Identifier, criteria Criteria) (*Query, error)
}
ExecutionResultQueryProvider provides execution results and execution nodes based on criteria. It allows querying for execution results by block ID with specific filtering criteria to ensure consistency and reliability of execution results.
type ExecutionStateCache ¶
type ExecutionStateCache interface {
// Snapshot returns a view of the execution state as of the provided ExecutionResult.
// The returned Snapshot provides access to execution state data for the fork ending
// on the provided ExecutionResult which extends from the latest sealed result.
// The result may be sealed or unsealed. Only data for finalized blocks is available.
//
// Expected errors during normal operation:
// - storage.ErrNotFound - result is not available, not ready for querying, or does not descend from the latest sealed result.
// - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
Snapshot(executionResultID flow.Identifier) (Snapshot, error)
}
ExecutionStateCache provides access to execution state snapshots for querying data at specific ExecutionResults.
type Pipeline ¶
type Pipeline interface {
PipelineStateProvider
// Run starts the pipeline processing and blocks until completion or context cancellation.
// CAUTION: not concurrency safe! Run must only be called once.
//
// Expected Errors:
// - context.Canceled: when the context is canceled
// - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
Run(ctx context.Context, core Core, parentState State) error
// SetSealed marks the pipeline's result as sealed, which enables transitioning from StateWaitingPersist to StatePersisting.
SetSealed()
// OnParentStateUpdated updates the pipeline's parent's state.
OnParentStateUpdated(parentState State)
// Abandon marks the pipeline as abandoned.
Abandon()
}
Pipeline represents a processing pipelined state machine for a single ExecutionResult. The state machine is initialized in the Pending state.
The state machine is designed to be run in a single goroutine. The Run method must only be called once.
type PipelineFactory ¶
type PipelineFactory interface {
NewPipeline(result *flow.ExecutionResult, isSealed bool) Pipeline
}
PipelineFactory is a factory object for creating new Pipeline instances.
type PipelineStateConsumer ¶
type PipelineStateConsumer interface {
// OnStateUpdated is called when a pipeline's state changes to notify the receiver of the new state.
// This method is will be called in the same goroutine that runs the pipeline, so it must not block.
OnStateUpdated(newState State)
}
PipelineStateConsumer is a receiver of the pipeline state updates. PipelineStateConsumer implementations must be - NON-BLOCKING and consume the state updates without noteworthy delay
type PipelineStateProvider ¶
type PipelineStateProvider interface {
// GetState returns the current state of the pipeline.
GetState() State
}
PipelineStateProvider is an interface that provides a pipeline's state.
type Query ¶
type Query struct {
// ExecutionResult is the execution result for the queried block
ExecutionResult *flow.ExecutionResult
// ExecutionNodes is the list of execution node identities that produced the result
ExecutionNodes flow.IdentitySkeletonList
}
Query contains the result of an execution result query. It includes both the execution result and the execution nodes that produced it.
type Snapshot ¶
type Snapshot interface {
// Events returns a reader for querying event data.
Events() storage.EventsReader
// Collections returns a reader for querying collection data.
Collections() storage.CollectionsReader
// Transactions returns a reader for querying transaction data.
Transactions() storage.TransactionsReader
// LightTransactionResults returns a reader for querying light transaction result data.
LightTransactionResults() storage.LightTransactionResultsReader
// TransactionResultErrorMessages returns a reader for querying transaction error message data.
TransactionResultErrorMessages() storage.TransactionResultErrorMessagesReader
// Registers returns a reader for querying register data.
Registers() storage.RegisterIndexReader
}
Snapshot provides access to execution data readers for querying various data types from a specific ExecutionResult.
type State ¶
type State int32
State represents the state of the processing pipeline
const ( // StatePending is the initial state after instantiation, before Run is called StatePending State = iota // StateProcessing represents the state where data processing (download and indexing) has been started StateProcessing // StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met StateWaitingPersist // StateComplete represents the state where all data is persisted to storage StateComplete // StateAbandoned represents the state where processing was aborted StateAbandoned )
func (State) IsTerminal ¶
IsTerminal returns true if the state is a terminal state (Complete or Abandoned).