pipeline

package
v0.42.2-compilervm-test-5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2025 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second

Variables

This section is empty.

Functions

This section is empty.

Types

type Core

type Core interface {
	// Download retrieves all necessary data for processing.
	// CAUTION: not concurrency safe!
	//
	// Expected errors:
	// - context.Canceled: if the provided context was canceled before completion
	// - context.DeadlineExceeded: if the provided context was canceled due to its deadline reached
	// - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
	Download(ctx context.Context) error

	// Index processes the downloaded data and creates in-memory indexes.
	// CAUTION: not concurrency safe!
	//
	// No errors are expected during normal operations
	Index() error

	// Persist stores the indexed data in permanent storage.
	// CAUTION: not concurrency safe!
	//
	// No errors are expected during normal operations
	Persist() error

	// Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted
	// and any data dropped.
	// CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data.
	// CAUTION: not concurrency safe!
	//
	// No errors are expected during normal operations
	Abandon() error
}

Core defines the interface for pipeline processing steps. Each implementation should handle an execution data and implement the three-phase processing: download, index, and persist. CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data.

type CoreImpl

type CoreImpl struct {
	// contains filtered or unexported fields
}

CoreImpl implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data. CAUTION: not concurrency safe!

func NewCoreImpl

func NewCoreImpl(
	logger zerolog.Logger,
	executionResult *flow.ExecutionResult,
	header *flow.Header,
	execDataRequester requester.ExecutionDataRequester,
	txResultErrMsgsRequester tx_error_messages.Requester,
	txResultErrMsgsRequestTimeout time.Duration,
	persistentRegisters storage.RegisterIndex,
	persistentEvents storage.Events,
	persistentCollections storage.Collections,
	persistentTransactions storage.Transactions,
	persistentResults storage.LightTransactionResults,
	persistentTxResultErrMsg storage.TransactionResultErrorMessages,
	latestPersistedSealedResult storage.LatestPersistedSealedResult,
	protocolDB storage.DB,
) *CoreImpl

NewCoreImpl creates a new CoreImpl with all necessary dependencies CAUTION: not concurrency safe!

func (*CoreImpl) Abandon

func (c *CoreImpl) Abandon() error

Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data. CAUTION: not concurrency safe!

No errors are expected during normal operations

func (*CoreImpl) Download

func (c *CoreImpl) Download(ctx context.Context) error

Download downloads execution data and transaction results error for the block CAUTION: not concurrency safe!

Expected errors: - context.Canceled: if the provided context was canceled before completion - context.DeadlineExceeded: if the provided context was canceled due to its deadline reached - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)

func (*CoreImpl) Index

func (c *CoreImpl) Index() error

Index retrieves the downloaded execution data and transaction results error messages from the caches and indexes them into in-memory storage. CAUTION: not concurrency safe!

No errors are expected during normal operations

func (*CoreImpl) Persist

func (c *CoreImpl) Persist() error

Persist persists the indexed data to permanent storage atomically. CAUTION: not concurrency safe!

No errors are expected during normal operations

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline represents a generic processing pipeline with state transitions. It processes data through sequential states: Ready -> Downloading -> Indexing -> WaitingPersist -> Persisting -> Complete, with conditions for each transition.

func NewPipeline

func NewPipeline(
	logger zerolog.Logger,
	isSealed bool,
	executionResult *flow.ExecutionResult,
	core Core,
	stateUpdatePublisher StateUpdatePublisher,
) *Pipeline

NewPipeline creates a new processing pipeline. Pipelines must only be created for ExecutionResults that descend from the latest persisted sealed result. The pipeline is initialized in the Ready state.

Parameters:

  • logger: the logger to use for the pipeline
  • isSealed: indicates if the pipeline's ExecutionResult is sealed
  • executionResult: processed execution result
  • core: implements the processing logic for the pipeline
  • stateUpdatePublisher: called when the pipeline needs to broadcast state updates

Returns:

  • new pipeline object

func (*Pipeline) GetState

func (p *Pipeline) GetState() State

GetState returns the current state of the pipeline.

func (*Pipeline) Run

func (p *Pipeline) Run(parentCtx context.Context) error

Run starts the pipeline processing and blocks until completion or context cancellation.

This function handles the progression through the pipeline states, executing the appropriate processing functions at each step.

When the pipeline reaches a terminal state (StateComplete or StateCanceled), the function returns. The function will also return if the provided context is canceled.

Returns an error if any processing step fails with an irrecoverable error. Returns nil if processing completes successfully, reaches a terminal state, or if either the parent or pipeline context is canceled.

func (*Pipeline) SetSealed

func (p *Pipeline) SetSealed()

SetSealed marks the data as sealed, which enables transitioning from StateWaitingPersist to StatePersisting.

func (*Pipeline) UpdateState

func (p *Pipeline) UpdateState(update StateUpdate)

UpdateState updates the pipeline's state based on the provided state update.

type State

type State int

State represents the state of the processing pipeline

const (
	// StateReady is the initial state after instantiation and before downloading has begun
	StateReady State = iota
	// StateDownloading represents the state where data download is in progress
	StateDownloading
	// StateIndexing represents the state where data is being indexed
	StateIndexing
	// StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met
	StateWaitingPersist
	// StatePersisting represents the state where the indexed data is being persisted to storage
	StatePersisting
	// StateComplete represents the state where all data is persisted to storage
	StateComplete
	// StateCanceled represents the state where processing was aborted
	StateCanceled
)

func (State) String

func (s State) String() string

String representation of states for logging

type StateUpdate

type StateUpdate struct {
	// DescendsFromLastPersistedSealed indicates if this pipeline descends from
	// the last persisted sealed result
	DescendsFromLastPersistedSealed bool
	// ParentState contains the state information from the parent pipeline
	ParentState State
}

StateUpdate contains state update information

type StateUpdatePublisher

type StateUpdatePublisher func(update StateUpdate)

StateUpdatePublisher is a function that publishes state updates

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL