optimistic_sync

package
v0.43.1-rc.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2025 License: AGPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second

DefaultTxResultErrMsgsRequestTimeout is the default timeout for requesting transaction result error messages.

Variables

View Source
var (
	// ErrInvalidTransition is returned when a state transition is invalid.
	ErrInvalidTransition = errors.New("invalid state transition")
)

Functions

func NewMockStateConsumer

func NewMockStateConsumer() *mockStateConsumer

NewMockStateConsumer creates a new instance of mockStateConsumer with a buffered channel.

func NewMockStateProvider

func NewMockStateProvider() *mockStateProvider

NewMockStateProvider initializes a mockStateProvider with the default state StatePending.

Types

type Core

type Core interface {
	// Download retrieves all necessary data for processing from the network.
	// Download will block until the data is successfully downloaded, and has not internal timeout.
	// When Aboandon is called, the caller must cancel the context passed in to shutdown the operation
	// otherwise it may block indefinitely.
	//
	// Expected error returns during normal operation:
	// - [context.Canceled]: if the provided context was canceled before completion
	Download(ctx context.Context) error

	// Index processes the downloaded data and stores it into in-memory indexes.
	// Must be called after Download.
	//
	// No error returns are expected during normal operations
	Index() error

	// Persist stores the indexed data in permanent storage.
	// Must be called after Index.
	//
	// No error returns are expected during normal operations
	Persist() error

	// Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted
	// and any data dropped.
	// This method will block until other in-progress operations are complete. If Download is in progress,
	// the caller should cancel its context to ensure the operation completes in a timely manner.
	Abandon()
}

<component_spec> Core defines the interface for pipelined execution result processing. There are 3 main steps which must be completed sequentially and exactly once. 1. Download the BlockExecutionData and TransactionResultErrorMessages for the execution result. 2. Index the downloaded data into mempools. 3. Persist the indexed data to into persisted storage.

If the protocol abandons the execution result, Abandon() is called to signal to the Core instance that processing will stop and any data accumulated may be discarded. Abandon() may be called at any time, but may block until in-progress operations are complete. </component_spec>

All exported methods are safe for concurrent use.

type CoreFactory

type CoreFactory interface {
	NewCore(result *flow.ExecutionResult) Core
}

CoreFactory is a factory object for creating new Core instances.

type CoreImpl

type CoreImpl struct {
	// contains filtered or unexported fields
}

CoreImpl implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data.

Safe for concurrent use.

func NewCoreImpl

func NewCoreImpl(
	logger zerolog.Logger,
	executionResult *flow.ExecutionResult,
	block *flow.Block,
	execDataRequester requester.ExecutionDataRequester,
	txResultErrMsgsRequester tx_error_messages.Requester,
	txResultErrMsgsRequestTimeout time.Duration,
	persistentRegisters storage.RegisterIndex,
	persistentEvents storage.Events,
	persistentCollections storage.Collections,
	persistentResults storage.LightTransactionResults,
	persistentTxResultErrMsg storage.TransactionResultErrorMessages,
	latestPersistedSealedResult storage.LatestPersistedSealedResult,
	protocolDB storage.DB,
	lockManager storage.LockManager,
) (*CoreImpl, error)

NewCoreImpl creates a new CoreImpl with all necessary dependencies Safe for concurrent use.

No error returns are expected during normal operations

func (*CoreImpl) Abandon

func (c *CoreImpl) Abandon()

Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. This method will block until other in-progress operations are complete. If Download is in progress, the caller should cancel its context to ensure the operation completes in a timely manner.

The method is idempotent. Calling it multiple times has no effect.

func (*CoreImpl) Download

func (c *CoreImpl) Download(ctx context.Context) error

Download retrieves all necessary data for processing from the network. Download will block until the data is successfully downloaded, and has not internal timeout. When Aboandon is called, the caller must cancel the context passed in to shutdown the operation otherwise it may block indefinitely.

The method may only be called once. Calling it multiple times will return an error. Calling Download after Abandon is called will return an error.

Expected error returns during normal operation: - context.Canceled: if the provided context was canceled before completion

func (*CoreImpl) Index

func (c *CoreImpl) Index() error

Index processes the downloaded data and stores it into in-memory indexes. Must be called after Download.

The method may only be called once. Calling it multiple times will return an error. Calling Index after Abandon is called will return an error.

No error returns are expected during normal operations

func (*CoreImpl) Persist

func (c *CoreImpl) Persist() error

Persist stores the indexed data in permanent storage. Must be called after Index.

The method may only be called once. Calling it multiple times will return an error. Calling Persist after Abandon is called will return an error.

No error returns are expected during normal operations

type Criteria

type Criteria struct {
	// AgreeingExecutorsCount is the number of receipts including the same ExecutionResult
	AgreeingExecutorsCount uint
	// RequiredExecutors is the list of EN node IDs, one of which must have produced the result
	RequiredExecutors flow.IdentifierList
}

Criteria defines the filtering criteria for execution result queries. It specifies requirements for execution result selection including the number of agreeing executors and requires executor nodes.

func (*Criteria) OverrideWith

func (c *Criteria) OverrideWith(override Criteria) Criteria

OverrideWith overrides the original criteria with the incoming criteria, returning a new Criteria object. Fields from `override` criteria take precedence when set.

type ExecutionResultQueryProvider

type ExecutionResultQueryProvider interface {
	// ExecutionResultQuery retrieves execution results and associated execution nodes for a given block ID
	// based on the provided criteria. It returns a Query containing the execution result and
	// the execution nodes that produced it.
	//
	// Expected errors during normal operations:
	//   - backend.InsufficientExecutionReceipts - found insufficient receipts for given block ID.
	ExecutionResultQuery(blockID flow.Identifier, criteria Criteria) (*Query, error)
}

ExecutionResultQueryProvider provides execution results and execution nodes based on criteria. It allows querying for execution results by block ID with specific filtering criteria to ensure consistency and reliability of execution results.

type ExecutionStateCache

type ExecutionStateCache interface {
	// Snapshot returns a view of the execution state as of the provided ExecutionResult.
	// The returned Snapshot provides access to execution state data for the fork ending
	// on the provided ExecutionResult which extends from the latest sealed result.
	// The result may be sealed or unsealed. Only data for finalized blocks is available.
	//
	// Expected errors during normal operation:
	//   - storage.ErrNotFound - result is not available, not ready for querying, or does not descend from the latest sealed result.
	//   - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
	Snapshot(executionResultID flow.Identifier) (Snapshot, error)
}

ExecutionStateCache provides access to execution state snapshots for querying data at specific ExecutionResults.

type Pipeline

type Pipeline interface {
	PipelineStateProvider

	// Run starts the pipeline processing and blocks until completion or context cancellation.
	// CAUTION: not concurrency safe! Run must only be called once.
	//
	// Expected Errors:
	//   - context.Canceled: when the context is canceled
	//   - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
	Run(ctx context.Context, core Core, parentState State) error

	// SetSealed marks the pipeline's result as sealed, which enables transitioning from StateWaitingPersist to StatePersisting.
	SetSealed()

	// OnParentStateUpdated updates the pipeline's parent's state.
	OnParentStateUpdated(parentState State)

	// Abandon marks the pipeline as abandoned.
	Abandon()
}

Pipeline represents a processing pipelined state machine for a single ExecutionResult. The state machine is initialized in the Pending state.

The state machine is designed to be run in a single goroutine. The Run method must only be called once.

type PipelineFactory

type PipelineFactory interface {
	NewPipeline(result *flow.ExecutionResult, isSealed bool) Pipeline
}

PipelineFactory is a factory object for creating new Pipeline instances.

type PipelineImpl

type PipelineImpl struct {
	// contains filtered or unexported fields
}

PipelineImpl implements the Pipeline interface

func NewPipeline

func NewPipeline(
	log zerolog.Logger,
	executionResult *flow.ExecutionResult,
	isSealed bool,
	stateReceiver PipelineStateConsumer,
) *PipelineImpl

NewPipeline creates a new processing pipeline. The pipeline is initialized in the Pending state.

func (*PipelineImpl) Abandon

func (p *PipelineImpl) Abandon()

Abandon marks the pipeline as abandoned This will cause the pipeline to eventually transition to the Abandoned state and halt processing

func (*PipelineImpl) GetState

func (p *PipelineImpl) GetState() State

GetState returns the current state of the pipeline.

func (*PipelineImpl) OnParentStateUpdated

func (p *PipelineImpl) OnParentStateUpdated(parentState State)

OnParentStateUpdated updates the pipeline's state based on the provided parent state. If the parent state has changed, it will notify the state consumer and trigger a state change notification.

func (*PipelineImpl) Run

func (p *PipelineImpl) Run(ctx context.Context, core Core, parentState State) error

Run starts the pipeline processing and blocks until completion or context cancellation. CAUTION: not concurrency safe! Run must only be called once.

Expected Errors:

  • context.Canceled: when the context is canceled
  • All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)

func (*PipelineImpl) SetSealed

func (p *PipelineImpl) SetSealed()

SetSealed marks the execution result as sealed. This will cause the pipeline to eventually transition to the StateComplete state when the parent finishes processing.

type PipelineStateConsumer

type PipelineStateConsumer interface {
	// OnStateUpdated is called when a pipeline's state changes to notify the receiver of the new state.
	// This method is will be called in the same goroutine that runs the pipeline, so it must not block.
	OnStateUpdated(newState State)
}

PipelineStateConsumer is a receiver of the pipeline state updates. PipelineStateConsumer implementations must be - NON-BLOCKING and consume the state updates without noteworthy delay

type PipelineStateProvider

type PipelineStateProvider interface {
	// GetState returns the current state of the pipeline.
	GetState() State
}

PipelineStateProvider is an interface that provides a pipeline's state.

type Query

type Query struct {
	// ExecutionResult is the execution result for the queried block
	ExecutionResult *flow.ExecutionResult
	// ExecutionNodes is the list of execution node identities that produced the result
	ExecutionNodes flow.IdentitySkeletonList
}

Query contains the result of an execution result query. It includes both the execution result and the execution nodes that produced it.

type Snapshot

type Snapshot interface {
	// Events returns a reader for querying event data.
	Events() storage.EventsReader

	// Collections returns a reader for querying collection data.
	Collections() storage.CollectionsReader

	// Transactions returns a reader for querying transaction data.
	Transactions() storage.TransactionsReader

	// LightTransactionResults returns a reader for querying light transaction result data.
	LightTransactionResults() storage.LightTransactionResultsReader

	// TransactionResultErrorMessages returns a reader for querying transaction error message data.
	TransactionResultErrorMessages() storage.TransactionResultErrorMessagesReader

	// Registers returns a reader for querying register data.
	Registers() storage.RegisterIndexReader
}

Snapshot provides access to execution data readers for querying various data types from a specific ExecutionResult.

type State

type State int32

State represents the state of the processing pipeline

const (
	// StatePending is the initial state after instantiation, before Run is called
	StatePending State = iota
	// StateProcessing represents the state where data processing (download and indexing) has been started
	StateProcessing
	// StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met
	StateWaitingPersist
	// StateComplete represents the state where all data is persisted to storage
	StateComplete
	// StateAbandoned represents the state where processing was aborted
	StateAbandoned
)

func (State) IsTerminal

func (s State) IsTerminal() bool

IsTerminal returns true if the state is a terminal state (Complete or Abandoned).

func (State) String

func (s State) String() string

String representation of states for logging

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL