pipeline

package
v0.44.0-unsafe-collect... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2025 License: AGPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second

DefaultTxResultErrMsgsRequestTimeout is the default timeout for requesting transaction result error messages.

Variables

This section is empty.

Functions

func NewMockStateConsumer

func NewMockStateConsumer() *mockStateConsumer

NewMockStateConsumer creates a new instance of mockStateConsumer with a buffered channel.

func NewMockStateProvider

func NewMockStateProvider() *mockStateProvider

NewMockStateProvider initializes a mockStateProvider with the default state StatePending.

Types

type Core

type Core struct {
	// contains filtered or unexported fields
}

Core implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data.

Safe for concurrent use.

func NewCore

func NewCore(
	logger zerolog.Logger,
	executionResult *flow.ExecutionResult,
	block *flow.Block,
	execDataRequester requester.ExecutionDataRequester,
	txResultErrMsgsRequester tx_error_messages.Requester,
	txResultErrMsgsRequestTimeout time.Duration,
	persistentRegisters storage.RegisterIndex,
	persistentEvents storage.Events,
	persistentCollections storage.Collections,
	persistentResults storage.LightTransactionResults,
	persistentTxResultErrMsg storage.TransactionResultErrorMessages,
	latestPersistedSealedResult storage.LatestPersistedSealedResult,
	protocolDB storage.DB,
	lockManager storage.LockManager,
) (*Core, error)

NewCore creates a new Core with all necessary dependencies Safe for concurrent use.

No error returns are expected during normal operations

func (*Core) Abandon

func (c *Core) Abandon()

Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. This method will block until other in-progress operations are complete. If Download is in progress, the caller should cancel its context to ensure the operation completes in a timely manner.

The method is idempotent. Calling it multiple times has no effect.

func (*Core) Download

func (c *Core) Download(ctx context.Context) error

Download retrieves all necessary data for processing from the network. Download will block until the data is successfully downloaded, and has not internal timeout. When Aboandon is called, the caller must cancel the context passed in to shutdown the operation otherwise it may block indefinitely.

The method may only be called once. Calling it multiple times will return an error. Calling Download after Abandon is called will return an error.

Expected error returns during normal operation: - context.Canceled: if the provided context was canceled before completion

func (*Core) Index

func (c *Core) Index() error

Index processes the downloaded data and stores it into in-memory indexes. Must be called after Download.

The method may only be called once. Calling it multiple times will return an error. Calling Index after Abandon is called will return an error.

No error returns are expected during normal operations

func (*Core) Persist

func (c *Core) Persist() error

Persist stores the indexed data in permanent storage. Must be called after Index.

The method may only be called once. Calling it multiple times will return an error. Calling Persist after Abandon is called will return an error.

No error returns are expected during normal operations

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline implements the Pipeline interface

func NewPipeline

func NewPipeline(
	log zerolog.Logger,
	executionResult *flow.ExecutionResult,
	isSealed bool,
	stateReceiver optimistic_sync.PipelineStateConsumer,
) *Pipeline

NewPipeline creates a new processing pipeline. The pipeline is initialized in the Pending state.

func (*Pipeline) Abandon

func (p *Pipeline) Abandon()

Abandon marks the pipeline as abandoned This will cause the pipeline to eventually transition to the Abandoned state and halt processing

func (*Pipeline) GetState

func (p *Pipeline) GetState() optimistic_sync.State

GetState returns the current state of the pipeline.

func (*Pipeline) OnParentStateUpdated

func (p *Pipeline) OnParentStateUpdated(parentState optimistic_sync.State)

OnParentStateUpdated updates the pipeline's state based on the provided parent state. If the parent state has changed, it will notify the state consumer and trigger a state change notification.

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context, core optimistic_sync.Core, parentState optimistic_sync.State) error

Run starts the pipeline processing and blocks until completion or context cancellation. CAUTION: not concurrency safe! Run must only be called once.

Expected Errors:

  • context.Canceled: when the context is canceled
  • All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)

func (*Pipeline) SetSealed

func (p *Pipeline) SetSealed()

SetSealed marks the execution result as sealed. This will cause the pipeline to eventually transition to the StateComplete state when the parent finishes processing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL