backend

package
v0.42.4-pebble.4-fix-a... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2025 License: AGPL-3.0 Imports: 29 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HandleRPCSubscription added in v0.40.0

func HandleRPCSubscription[T any](sub subscription.Subscription, handleResponse func(resp T) error) error

HandleRPCSubscription is a generic handler for subscriptions to a specific type for rpc calls.

Parameters: - sub: The subscription. - handleResponse: The function responsible for handling the response of the subscribed type.

Expected errors during normal operation:

  • codes.Internal: If the subscription encounters an error or gets an unexpected response.

Types

type AccountStatusesBackend added in v0.33.30

type AccountStatusesBackend struct {
	// contains filtered or unexported fields
}

AccountStatusesBackend is a struct representing a backend implementation for subscribing to account statuses changes.

func (*AccountStatusesBackend) SubscribeAccountStatusesFromLatestBlock added in v0.33.30

func (b *AccountStatusesBackend) SubscribeAccountStatusesFromLatestBlock(
	ctx context.Context,
	filter state_stream.AccountStatusFilter,
) subscription.Subscription

SubscribeAccountStatusesFromLatestBlock subscribes to the streaming of account status changes starting from a latest sealed block, with an optional status filter.

No errors are expected during normal operation.

func (*AccountStatusesBackend) SubscribeAccountStatusesFromStartBlockID added in v0.33.30

func (b *AccountStatusesBackend) SubscribeAccountStatusesFromStartBlockID(
	ctx context.Context,
	startBlockID flow.Identifier,
	filter state_stream.AccountStatusFilter,
) subscription.Subscription

SubscribeAccountStatusesFromStartBlockID subscribes to the streaming of account status changes starting from a specific block ID with an optional status filter. Errors: - codes.ErrNotFound if could not get block by start blockID. - codes.Internal if there is an internal error.

func (*AccountStatusesBackend) SubscribeAccountStatusesFromStartHeight added in v0.33.30

func (b *AccountStatusesBackend) SubscribeAccountStatusesFromStartHeight(
	ctx context.Context,
	startHeight uint64,
	filter state_stream.AccountStatusFilter,
) subscription.Subscription

SubscribeAccountStatusesFromStartHeight subscribes to the streaming of account status changes starting from a specific block height, with an optional status filter. Errors: - codes.ErrNotFound if could not get block by start height. - codes.Internal if there is an internal error.

type AccountStatusesResponse added in v0.33.30

type AccountStatusesResponse struct {
	BlockID       flow.Identifier
	Height        uint64
	AccountEvents map[string]flow.EventsList
}

type Config

type Config struct {
	state_stream.EventFilterConfig

	// ListenAddr is the address the GRPC server will listen on as host:port
	ListenAddr string

	// MaxExecutionDataMsgSize is the max message size for block execution data API
	MaxExecutionDataMsgSize uint

	// RpcMetricsEnabled specifies whether to enable the GRPC metrics
	RpcMetricsEnabled bool

	// MaxGlobalStreams defines the global max number of streams that can be open at the same time.
	MaxGlobalStreams uint32

	// RegisterIDsRequestLimit defines the max number of register IDs that can be received in a single request.
	RegisterIDsRequestLimit uint32

	// ExecutionDataCacheSize is the max number of objects for the execution data cache.
	ExecutionDataCacheSize uint32

	// ClientSendTimeout is the timeout for sending a message to the client. After the timeout,
	// the stream is closed with an error.
	ClientSendTimeout time.Duration

	// ClientSendBufferSize is the size of the response buffer for sending messages to the client.
	ClientSendBufferSize uint

	// ResponseLimit is the max responses per second allowed on a stream. After exceeding the limit,
	// the stream is paused until more capacity is available. Searches of past data can be CPU
	// intensive, so this helps manage the impact.
	ResponseLimit float64

	// HeartbeatInterval specifies the block interval at which heartbeat messages should be sent.
	HeartbeatInterval uint64
}

Config defines the configurable options for the ingress server.

type Engine

type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

Engine exposes the server with the state stream API. By default, this engine is not enabled. In order to run this engine a port for the GRPC server to be served on should be specified in the run config.

func NewEng

func NewEng(
	log zerolog.Logger,
	config Config,
	execDataCache *cache.ExecutionDataCache,
	headers storage.Headers,
	chainID flow.ChainID,
	server *grpcserver.GrpcServer,
	backend *StateStreamBackend,
) (*Engine, error)

NewEng returns a new ingress server.

type EventsBackend

type EventsBackend struct {
	// contains filtered or unexported fields
}

func (*EventsBackend) SubscribeEvents

func (b *EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription

SubscribeEvents is deprecated and will be removed in a future version. Use SubscribeEventsFromStartBlockID, SubscribeEventsFromStartHeight or SubscribeEventsFromLatest.

SubscribeEvents streams events for all blocks starting at the specified block ID or block height up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

Only one of startBlockID and startHeight may be set. If neither startBlockID nor startHeight is provided, the latest sealed block is used.

Events within each block are filtered by the provided EventFilter, and only those events that match the filter are returned. If no filter is provided, all events are returned.

Parameters: - ctx: Context for the operation. - startBlockID: The identifier of the starting block. If provided, startHeight should be 0. - startHeight: The height of the starting block. If provided, startBlockID should be flow.ZeroID. - filter: The event filter used to filter events.

If invalid parameters will be supplied SubscribeEvents will return a failed subscription.

func (*EventsBackend) SubscribeEventsFromLatest added in v0.33.30

func (b *EventsBackend) SubscribeEventsFromLatest(ctx context.Context, filter state_stream.EventFilter) subscription.Subscription

SubscribeEventsFromLatest subscribes to events starting at the latest sealed block, up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

Events within each block are filtered by the provided EventFilter, and only those events that match the filter are returned. If no filter is provided, all events are returned.

Parameters: - ctx: Context for the operation. - filter: The event filter used to filter events.

If invalid parameters will be supplied SubscribeEventsFromLatest will return a failed subscription.

func (*EventsBackend) SubscribeEventsFromStartBlockID added in v0.33.30

func (b *EventsBackend) SubscribeEventsFromStartBlockID(ctx context.Context, startBlockID flow.Identifier, filter state_stream.EventFilter) subscription.Subscription

SubscribeEventsFromStartBlockID streams events starting at the specified block ID, up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

Events within each block are filtered by the provided EventFilter, and only those events that match the filter are returned. If no filter is provided, all events are returned.

Parameters: - ctx: Context for the operation. - startBlockID: The identifier of the starting block. - filter: The event filter used to filter events.

If invalid parameters will be supplied SubscribeEventsFromStartBlockID will return a failed subscription.

func (*EventsBackend) SubscribeEventsFromStartHeight added in v0.33.30

func (b *EventsBackend) SubscribeEventsFromStartHeight(ctx context.Context, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription

SubscribeEventsFromStartHeight streams events starting at the specified block height, up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

Events within each block are filtered by the provided EventFilter, and only those events that match the filter are returned. If no filter is provided, all events are returned.

Parameters: - ctx: Context for the operation. - startHeight: The height of the starting block. - filter: The event filter used to filter events.

If invalid parameters will be supplied SubscribeEventsFromStartHeight will return a failed subscription.

type EventsProvider added in v0.43.0

type EventsProvider struct {
	// contains filtered or unexported fields
}

EventsProvider retrieves events by block height. It can be configured to retrieve events from the events indexer(if available) or using a dedicated callback to query it from other sources.

func (*EventsProvider) GetAllEventsResponse added in v0.43.0

func (b *EventsProvider) GetAllEventsResponse(ctx context.Context, height uint64) (*EventsResponse, error)

GetAllEventsResponse returns a function that retrieves the event response for a given block height. Expected errors: - codes.NotFound: If block header for the specified block height is not found. - error: An error, if any, encountered during getting events from storage or execution data.

type EventsResponse

type EventsResponse struct {
	BlockID        flow.Identifier
	Height         uint64
	Events         flow.EventsList
	BlockTimestamp time.Time
}

EventsResponse represents the response containing events for a specific block.

type ExecutionDataBackend

type ExecutionDataBackend struct {
	// contains filtered or unexported fields
}

func (*ExecutionDataBackend) GetExecutionDataByBlockID

func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)

func (*ExecutionDataBackend) SubscribeExecutionData

func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) subscription.Subscription

SubscribeExecutionData is deprecated and will be removed in future versions. Use SubscribeExecutionDataFromStartBlockID, SubscribeExecutionDataFromStartBlockHeight or SubscribeExecutionDataFromLatest.

SubscribeExecutionData streams execution data for all blocks starting at the specified block ID or block height up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

Only one of startBlockID and startHeight may be set. If neither startBlockID nor startHeight is provided, the latest sealed block is used.

Parameters: - ctx: Context for the operation. - startBlockID: The identifier of the starting block. If provided, startHeight should be 0. - startHeight: The height of the starting block. If provided, startBlockID should be flow.ZeroID.

If invalid parameters are provided, failed subscription will be returned.

func (*ExecutionDataBackend) SubscribeExecutionDataFromLatest added in v0.33.30

func (b *ExecutionDataBackend) SubscribeExecutionDataFromLatest(ctx context.Context) subscription.Subscription

SubscribeExecutionDataFromLatest streams execution data starting at the latest block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

Parameters: - ctx: Context for the operation.

If invalid parameters are provided, failed subscription will be returned.

func (*ExecutionDataBackend) SubscribeExecutionDataFromStartBlockHeight added in v0.33.30

func (b *ExecutionDataBackend) SubscribeExecutionDataFromStartBlockHeight(ctx context.Context, startBlockHeight uint64) subscription.Subscription

SubscribeExecutionDataFromStartBlockHeight streams execution data for all blocks starting at the specified block height up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

Parameters: - ctx: Context for the operation. - startHeight: The height of the starting block.

If invalid parameters are provided, failed subscription will be returned.

func (*ExecutionDataBackend) SubscribeExecutionDataFromStartBlockID added in v0.33.30

func (b *ExecutionDataBackend) SubscribeExecutionDataFromStartBlockID(ctx context.Context, startBlockID flow.Identifier) subscription.Subscription

SubscribeExecutionDataFromStartBlockID streams execution data for all blocks starting at the specified block ID up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

Parameters: - ctx: Context for the operation. - startBlockID: The identifier of the starting block.

If invalid parameters are provided, failed subscription will be returned.

type ExecutionDataResponse

type ExecutionDataResponse struct {
	Height         uint64
	ExecutionData  *execution_data.BlockExecutionData
	BlockTimestamp time.Time
}

type Handler

type Handler struct {
	subscription.StreamingData
	// contains filtered or unexported fields
}

func NewHandler

func NewHandler(api state_stream.API, chain flow.Chain, config Config) *Handler

func (*Handler) SubscribeAccountStatusesFromLatestBlock added in v0.33.30

SubscribeAccountStatusesFromLatestBlock streams account statuses for all blocks starting at the last sealed block, up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

func (*Handler) SubscribeAccountStatusesFromStartBlockID added in v0.33.30

SubscribeAccountStatusesFromStartBlockID streams account statuses for all blocks starting at the requested start block ID, up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

func (*Handler) SubscribeAccountStatusesFromStartHeight added in v0.33.30

SubscribeAccountStatusesFromStartHeight streams account statuses for all blocks starting at the requested start block height, up until the latest available block. Once the latest is reached, the stream will remain open and responses are sent for each new block as it becomes available.

func (*Handler) SubscribeEvents

SubscribeEvents is deprecated and will be removed in a future version. Use SubscribeEventsFromStartBlockID, SubscribeEventsFromStartHeight or SubscribeEventsFromLatest.

SubscribeEvents handles subscription requests for events starting at the specified block ID or block height. The handler manages the subscription and sends the subscribed information to the client via the provided stream.

Responses are returned for each block containing at least one event that matches the filter. Additionally, heartbeat responses (SubscribeEventsResponse with no events) are returned periodically to allow clients to track which blocks were searched. Clients can use this information to determine which block to start from when reconnecting.

Expected errors during normal operation: - codes.InvalidArgument - if provided both startBlockID and startHeight, if invalid startBlockID is provided, if invalid event filter is provided. - codes.ResourceExhausted - if the maximum number of streams is reached. - codes.Internal - could not convert events to entity, if stream encountered an error, if stream got unexpected response or could not send response.

func (*Handler) SubscribeEventsFromLatest added in v0.33.30

SubscribeEventsFromLatest handles subscription requests for events started from latest sealed block.. The handler manages the subscription and sends the subscribed information to the client via the provided stream.

Responses are returned for each block containing at least one event that matches the filter. Additionally, heartbeat responses (SubscribeEventsResponse with no events) are returned periodically to allow clients to track which blocks were searched. Clients can use this information to determine which block to start from when reconnecting.

Expected errors during normal operation: - codes.InvalidArgument - if invalid event filter is provided. - codes.ResourceExhausted - if the maximum number of streams is reached. - codes.Internal - could not convert events to entity, if stream encountered an error, if stream got unexpected response or could not send response.

func (*Handler) SubscribeEventsFromStartBlockID added in v0.33.30

SubscribeEventsFromStartBlockID handles subscription requests for events starting at the specified block ID. The handler manages the subscription and sends the subscribed information to the client via the provided stream.

Responses are returned for each block containing at least one event that matches the filter. Additionally, heartbeat responses (SubscribeEventsResponse with no events) are returned periodically to allow clients to track which blocks were searched. Clients can use this information to determine which block to start from when reconnecting.

Expected errors during normal operation: - codes.InvalidArgument - if invalid startBlockID is provided, if invalid event filter is provided. - codes.ResourceExhausted - if the maximum number of streams is reached. - codes.Internal - could not convert events to entity, if stream encountered an error, if stream got unexpected response or could not send response.

func (*Handler) SubscribeEventsFromStartHeight added in v0.33.30

SubscribeEventsFromStartHeight handles subscription requests for events starting at the specified block height. The handler manages the subscription and sends the subscribed information to the client via the provided stream.

Responses are returned for each block containing at least one event that matches the filter. Additionally, heartbeat responses (SubscribeEventsResponse with no events) are returned periodically to allow clients to track which blocks were searched. Clients can use this information to determine which block to start from when reconnecting.

Expected errors during normal operation: - codes.InvalidArgument - if invalid event filter is provided. - codes.ResourceExhausted - if the maximum number of streams is reached. - codes.Internal - could not convert events to entity, if stream encountered an error, if stream got unexpected response or could not send response.

func (*Handler) SubscribeExecutionData

SubscribeExecutionData is deprecated and will be removed in a future version. Use SubscribeExecutionDataFromStartBlockID, SubscribeExecutionDataFromStartBlockHeight or SubscribeExecutionDataFromLatest.

SubscribeExecutionData handles subscription requests for execution data starting at the specified block ID or block height. The handler manages the subscription and sends the subscribed information to the client via the provided stream.

Expected errors during normal operation: - codes.InvalidArgument - if request contains invalid startBlockID. - codes.ResourceExhausted - if the maximum number of streams is reached. - codes.Internal - if stream got unexpected response or could not send response.

func (*Handler) SubscribeExecutionDataFromLatest added in v0.33.30

SubscribeExecutionDataFromLatest handles subscription requests for execution data starting at the latest block. The handler manages the subscription and sends the subscribed information to the client via the provided stream.

Expected errors during normal operation: - codes.ResourceExhausted - if the maximum number of streams is reached. - codes.Internal - if stream got unexpected response or could not send response.

func (*Handler) SubscribeExecutionDataFromStartBlockHeight added in v0.33.30

SubscribeExecutionDataFromStartBlockHeight handles subscription requests for execution data starting at the specified block height. The handler manages the subscription and sends the subscribed information to the client via the provided stream.

Expected errors during normal operation: - codes.ResourceExhausted - if the maximum number of streams is reached. - codes.Internal - if stream got unexpected response or could not send response.

func (*Handler) SubscribeExecutionDataFromStartBlockID added in v0.33.30

SubscribeExecutionDataFromStartBlockID handles subscription requests for execution data starting at the specified block ID. The handler manages the subscription and sends the subscribed information to the client via the provided stream.

Expected errors during normal operation: - codes.InvalidArgument - if request contains invalid startBlockID. - codes.ResourceExhausted - if the maximum number of streams is reached. - codes.Internal - if stream got unexpected response or could not send response.

type StateStreamBackend

type StateStreamBackend struct {
	tracker.ExecutionDataTracker

	ExecutionDataBackend
	EventsBackend
	AccountStatusesBackend
	// contains filtered or unexported fields
}

func New

func New(
	log zerolog.Logger,
	state protocol.State,
	headers storage.Headers,
	seals storage.Seals,
	results storage.ExecutionResults,
	execDataStore execution_data.ExecutionDataStore,
	execDataCache *cache.ExecutionDataCache,
	registers *execution.RegistersAsyncStore,
	eventsIndex *index.EventsIndex,
	useEventsIndex bool,
	registerIDsRequestLimit int,
	subscriptionHandler *subscription.SubscriptionHandler,
	executionDataTracker tracker.ExecutionDataTracker,
) (*StateStreamBackend, error)

func (*StateStreamBackend) GetRegisterValues added in v0.32.10

func (b *StateStreamBackend) GetRegisterValues(ids flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error)

GetRegisterValues returns the register values for the given register IDs at the given block height.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL