Documentation
¶
Index ¶
- Constants
- Variables
- type API
- type Config
- type Engine
- type EventFilter
- type EventFilterConfig
- type EventsBackend
- type EventsResponse
- type ExecutionDataBackend
- type ExecutionDataResponse
- type GetDataByHeightFunc
- type GetExecutionDataFunc
- type GetStartHeightFunc
- type Handler
- func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *access.GetExecutionDataByBlockIDRequest) (*access.GetExecutionDataByBlockIDResponse, error)
- func (h *Handler) SubscribeEvents(request *access.SubscribeEventsRequest, ...) error
- func (h *Handler) SubscribeExecutionData(request *access.SubscribeExecutionDataRequest, ...) error
- type HeightBasedSubscription
- type ParsedEvent
- type ParsedEventType
- type StateStreamBackend
- type Streamable
- type Streamer
- type Subscription
- type SubscriptionImpl
- func (sub *SubscriptionImpl) Channel() <-chan interface{}
- func (sub *SubscriptionImpl) Close()
- func (sub *SubscriptionImpl) Err() error
- func (sub *SubscriptionImpl) Fail(err error)
- func (sub *SubscriptionImpl) ID() string
- func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error
Constants ¶
const ( // DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time. DefaultMaxGlobalStreams = 1000 // DefaultCacheSize defines the default max number of objects for the execution data cache. DefaultCacheSize = 100 // DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout // expires, the connection is closed. DefaultSendTimeout = 30 * time.Second )
const ( // DefaultMaxEventTypes is the default maximum number of event types that can be specified in a filter DefaultMaxEventTypes = 1000 // DefaultMaxAddresses is the default maximum number of addresses that can be specified in a filter DefaultMaxAddresses = 1000 // DefaultMaxContracts is the default maximum number of contracts that can be specified in a filter DefaultMaxContracts = 1000 )
const DefaultSendBufferSize = 10
DefaultSendBufferSize is the default buffer size for the subscription's send channel. The size is chosen to balance memory overhead from each subscription with performance when streaming existing data.
Variables ¶
var DefaultEventFilterConfig = EventFilterConfig{ MaxEventTypes: DefaultMaxEventTypes, MaxAddresses: DefaultMaxAddresses, MaxContracts: DefaultMaxContracts, }
DefaultEventFilterConfig is the default configuration for EventFilters
Functions ¶
This section is empty.
Types ¶
type API ¶
type API interface {
GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)
SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startBlockHeight uint64) Subscription
SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription
}
type Config ¶
type Config struct {
EventFilterConfig
// ListenAddr is the address the GRPC server will listen on as host:port
ListenAddr string
// MaxExecutionDataMsgSize is the max message size for block execution data API
MaxExecutionDataMsgSize uint
// RpcMetricsEnabled specifies whether to enable the GRPC metrics
RpcMetricsEnabled bool
// MaxGlobalStreams defines the global max number of streams that can be open at the same time.
MaxGlobalStreams uint32
// ExecutionDataCacheSize is the max number of objects for the execution data cache.
ExecutionDataCacheSize uint32
// ClientSendTimeout is the timeout for sending a message to the client. After the timeout,
// the stream is closed with an error.
ClientSendTimeout time.Duration
// ClientSendBufferSize is the size of the response buffer for sending messages to the client.
ClientSendBufferSize uint
}
Config defines the configurable options for the ingress server.
type Engine ¶
type Engine struct {
*component.ComponentManager
// contains filtered or unexported fields
}
Engine exposes the server with the state stream API. By default, this engine is not enabled. In order to run this engine a port for the GRPC server to be served on should be specified in the run config.
func NewEng ¶
func NewEng( log zerolog.Logger, config Config, execDataStore execution_data.ExecutionDataStore, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, chainID flow.ChainID, apiRatelimits map[string]int, apiBurstLimits map[string]int, heroCacheMetrics module.HeroCacheMetrics, ) (*Engine, error)
NewEng returns a new ingress server.
func (*Engine) OnExecutionData ¶ added in v0.30.2
func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)
OnExecutionData is called to notify the engine when a new execution data is received.
type EventFilter ¶ added in v0.30.2
type EventFilter struct {
EventTypes map[flow.EventType]struct{}
Addresses map[string]struct{}
Contracts map[string]struct{}
// contains filtered or unexported fields
}
EventFilter represents a filter applied to events for a given subscription
func NewEventFilter ¶ added in v0.30.2
func NewEventFilter( config EventFilterConfig, chain flow.Chain, eventTypes []string, addresses []string, contracts []string, ) (EventFilter, error)
func (*EventFilter) Filter ¶ added in v0.30.2
func (f *EventFilter) Filter(events flow.EventsList) flow.EventsList
Filter applies the all filters on the provided list of events, and returns a list of events that match
type EventFilterConfig ¶ added in v0.30.2
EventFilterConfig is used to configure the limits for EventFilters
type EventsBackend ¶ added in v0.30.2
type EventsBackend struct {
// contains filtered or unexported fields
}
func (EventsBackend) SubscribeEvents ¶ added in v0.30.2
func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription
type EventsResponse ¶ added in v0.30.2
type EventsResponse struct {
BlockID flow.Identifier
Height uint64
Events flow.EventsList
}
type ExecutionDataBackend ¶ added in v0.30.2
type ExecutionDataBackend struct {
// contains filtered or unexported fields
}
func (*ExecutionDataBackend) GetExecutionDataByBlockID ¶ added in v0.30.2
func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)
func (*ExecutionDataBackend) SubscribeExecutionData ¶ added in v0.30.2
func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) Subscription
type ExecutionDataResponse ¶ added in v0.30.2
type ExecutionDataResponse struct {
Height uint64
ExecutionData *execution_data.BlockExecutionData
}
type GetDataByHeightFunc ¶ added in v0.30.2
GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions
type GetExecutionDataFunc ¶ added in v0.30.2
type GetExecutionDataFunc func(context.Context, flow.Identifier) (*execution_data.BlockExecutionDataEntity, error)
type GetStartHeightFunc ¶ added in v0.30.2
type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
func NewHandler ¶
func (*Handler) GetExecutionDataByBlockID ¶
func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *access.GetExecutionDataByBlockIDRequest) (*access.GetExecutionDataByBlockIDResponse, error)
func (*Handler) SubscribeEvents ¶ added in v0.30.2
func (h *Handler) SubscribeEvents(request *access.SubscribeEventsRequest, stream access.ExecutionDataAPI_SubscribeEventsServer) error
func (*Handler) SubscribeExecutionData ¶ added in v0.30.2
func (h *Handler) SubscribeExecutionData(request *access.SubscribeExecutionDataRequest, stream access.ExecutionDataAPI_SubscribeExecutionDataServer) error
type HeightBasedSubscription ¶ added in v0.30.2
type HeightBasedSubscription struct {
*SubscriptionImpl
// contains filtered or unexported fields
}
HeightBasedSubscription is a subscription that retrieves data sequentially by block height
func NewHeightBasedSubscription ¶ added in v0.30.2
func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription
type ParsedEvent ¶ added in v0.30.2
type ParsedEvent struct {
Type ParsedEventType
EventType flow.EventType
Address string
Contract string
ContractName string
Name string
}
func ParseEvent ¶ added in v0.30.2
func ParseEvent(eventType flow.EventType) (*ParsedEvent, error)
ParseEvent parses an event type into its parts. There are 2 valid EventType formats: - flow.[EventName] - A.[Address].[Contract].[EventName] Any other format results in an error.
type ParsedEventType ¶ added in v0.30.2
type ParsedEventType int
const ( ProtocolEventType ParsedEventType = iota + 1 AccountEventType )
type StateStreamBackend ¶
type StateStreamBackend struct {
ExecutionDataBackend
EventsBackend
// contains filtered or unexported fields
}
func New ¶
func New( log zerolog.Logger, config Config, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, execDataStore execution_data.ExecutionDataStore, execDataCache *herocache.Cache, broadcaster *engine.Broadcaster, ) (*StateStreamBackend, error)
type Streamable ¶ added in v0.30.2
type Streamable interface {
ID() string
Close()
Fail(error)
Send(context.Context, interface{}, time.Duration) error
Next(context.Context) (interface{}, error)
}
Streamable represents a subscription that can be streamed.
type Streamer ¶ added in v0.30.2
type Streamer struct {
// contains filtered or unexported fields
}
Streamer
func NewStreamer ¶ added in v0.30.2
func NewStreamer( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, sub Streamable, ) *Streamer
type Subscription ¶ added in v0.30.2
type Subscription interface {
// ID returns the unique identifier for this subscription used for logging
ID() string
// Channel returns the channel from which subscriptino data can be read
Channel() <-chan interface{}
// Err returns the error that caused the subscription to fail
Err() error
}
Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.
type SubscriptionImpl ¶ added in v0.30.2
type SubscriptionImpl struct {
// contains filtered or unexported fields
}
func NewSubscription ¶ added in v0.30.2
func NewSubscription(bufferSize int) *SubscriptionImpl
func (*SubscriptionImpl) Channel ¶ added in v0.30.2
func (sub *SubscriptionImpl) Channel() <-chan interface{}
Channel returns the channel from which subscriptino data can be read
func (*SubscriptionImpl) Close ¶ added in v0.30.2
func (sub *SubscriptionImpl) Close()
Close is called when a subscription ends gracefully, and closes the subscription channel
func (*SubscriptionImpl) Err ¶ added in v0.30.2
func (sub *SubscriptionImpl) Err() error
Err returns the error that caused the subscription to fail
func (*SubscriptionImpl) Fail ¶ added in v0.30.2
func (sub *SubscriptionImpl) Fail(err error)
Fail registers an error and closes the subscription channel
func (*SubscriptionImpl) ID ¶ added in v0.30.2
func (sub *SubscriptionImpl) ID() string
ID returns the subscription ID Note: this is not a cryptographic hash