 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
- Variables
- type API
- type Config
- type Engine
- type EventFilter
- type EventFilterConfig
- type EventsBackend
- type EventsResponse
- type ExecutionDataBackend
- type ExecutionDataResponse
- type GetDataByHeightFunc
- type GetExecutionDataFunc
- type GetStartHeightFunc
- type Handler
- func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *access.GetExecutionDataByBlockIDRequest) (*access.GetExecutionDataByBlockIDResponse, error)
- func (h *Handler) SubscribeEvents(request *access.SubscribeEventsRequest, ...) error
- func (h *Handler) SubscribeExecutionData(request *access.SubscribeExecutionDataRequest, ...) error
 
- type HeightBasedSubscription
- type ParsedEvent
- type ParsedEventType
- type StateStreamBackend
- type Streamable
- type Streamer
- type Subscription
- type SubscriptionImpl
- func (sub *SubscriptionImpl) Channel() <-chan interface{}
- func (sub *SubscriptionImpl) Close()
- func (sub *SubscriptionImpl) Err() error
- func (sub *SubscriptionImpl) Fail(err error)
- func (sub *SubscriptionImpl) ID() string
- func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error
 
Constants ¶
const ( // DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time. DefaultMaxGlobalStreams = 1000 // DefaultCacheSize defines the default max number of objects for the execution data cache. DefaultCacheSize = 100 // DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout // expires, the connection is closed. DefaultSendTimeout = 30 * time.Second )
const ( // DefaultMaxEventTypes is the default maximum number of event types that can be specified in a filter DefaultMaxEventTypes = 1000 // DefaultMaxAddresses is the default maximum number of addresses that can be specified in a filter DefaultMaxAddresses = 1000 // DefaultMaxContracts is the default maximum number of contracts that can be specified in a filter DefaultMaxContracts = 1000 )
const DefaultSendBufferSize = 10
    DefaultSendBufferSize is the default buffer size for the subscription's send channel. The size is chosen to balance memory overhead from each subscription with performance when streaming existing data.
Variables ¶
var DefaultEventFilterConfig = EventFilterConfig{ MaxEventTypes: DefaultMaxEventTypes, MaxAddresses: DefaultMaxAddresses, MaxContracts: DefaultMaxContracts, }
DefaultEventFilterConfig is the default configuration for EventFilters
Functions ¶
This section is empty.
Types ¶
type API ¶
type API interface {
	GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)
	SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startBlockHeight uint64) Subscription
	SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription
}
    type Config ¶
type Config struct {
	EventFilterConfig
	// ListenAddr is the address the GRPC server will listen on as host:port
	ListenAddr string
	// MaxExecutionDataMsgSize is the max message size for block execution data API
	MaxExecutionDataMsgSize uint
	// RpcMetricsEnabled specifies whether to enable the GRPC metrics
	RpcMetricsEnabled bool
	// MaxGlobalStreams defines the global max number of streams that can be open at the same time.
	MaxGlobalStreams uint32
	// ExecutionDataCacheSize is the max number of objects for the execution data cache.
	ExecutionDataCacheSize uint32
	// ClientSendTimeout is the timeout for sending a message to the client. After the timeout,
	// the stream is closed with an error.
	ClientSendTimeout time.Duration
	// ClientSendBufferSize is the size of the response buffer for sending messages to the client.
	ClientSendBufferSize uint
}
    Config defines the configurable options for the ingress server.
type Engine ¶
type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}
    Engine exposes the server with the state stream API. By default, this engine is not enabled. In order to run this engine a port for the GRPC server to be served on should be specified in the run config.
func NewEng ¶
func NewEng( log zerolog.Logger, config Config, execDataStore execution_data.ExecutionDataStore, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, chainID flow.ChainID, apiRatelimits map[string]int, apiBurstLimits map[string]int, heroCacheMetrics module.HeroCacheMetrics, ) (*Engine, error)
NewEng returns a new ingress server.
func (*Engine) OnExecutionData ¶ added in v0.30.2
func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)
OnExecutionData is called to notify the engine when a new execution data is received.
type EventFilter ¶ added in v0.30.2
type EventFilter struct {
	EventTypes map[flow.EventType]struct{}
	Addresses  map[string]struct{}
	Contracts  map[string]struct{}
	// contains filtered or unexported fields
}
    EventFilter represents a filter applied to events for a given subscription
func NewEventFilter ¶ added in v0.30.2
func NewEventFilter( config EventFilterConfig, chain flow.Chain, eventTypes []string, addresses []string, contracts []string, ) (EventFilter, error)
func (*EventFilter) Filter ¶ added in v0.30.2
func (f *EventFilter) Filter(events flow.EventsList) flow.EventsList
Filter applies the all filters on the provided list of events, and returns a list of events that match
type EventFilterConfig ¶ added in v0.30.2
EventFilterConfig is used to configure the limits for EventFilters
type EventsBackend ¶ added in v0.30.2
type EventsBackend struct {
	// contains filtered or unexported fields
}
    func (EventsBackend) SubscribeEvents ¶ added in v0.30.2
func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription
type EventsResponse ¶ added in v0.30.2
type EventsResponse struct {
	BlockID flow.Identifier
	Height  uint64
	Events  flow.EventsList
}
    type ExecutionDataBackend ¶ added in v0.30.2
type ExecutionDataBackend struct {
	// contains filtered or unexported fields
}
    func (*ExecutionDataBackend) GetExecutionDataByBlockID ¶ added in v0.30.2
func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)
func (*ExecutionDataBackend) SubscribeExecutionData ¶ added in v0.30.2
func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) Subscription
type ExecutionDataResponse ¶ added in v0.30.2
type ExecutionDataResponse struct {
	Height        uint64
	ExecutionData *execution_data.BlockExecutionData
}
    type GetDataByHeightFunc ¶ added in v0.30.2
GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions
type GetExecutionDataFunc ¶ added in v0.30.2
type GetExecutionDataFunc func(context.Context, flow.Identifier) (*execution_data.BlockExecutionDataEntity, error)
type GetStartHeightFunc ¶ added in v0.30.2
type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)
type Handler ¶
type Handler struct {
	// contains filtered or unexported fields
}
    func NewHandler ¶
func (*Handler) GetExecutionDataByBlockID ¶
func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *access.GetExecutionDataByBlockIDRequest) (*access.GetExecutionDataByBlockIDResponse, error)
func (*Handler) SubscribeEvents ¶ added in v0.30.2
func (h *Handler) SubscribeEvents(request *access.SubscribeEventsRequest, stream access.ExecutionDataAPI_SubscribeEventsServer) error
func (*Handler) SubscribeExecutionData ¶ added in v0.30.2
func (h *Handler) SubscribeExecutionData(request *access.SubscribeExecutionDataRequest, stream access.ExecutionDataAPI_SubscribeExecutionDataServer) error
type HeightBasedSubscription ¶ added in v0.30.2
type HeightBasedSubscription struct {
	*SubscriptionImpl
	// contains filtered or unexported fields
}
    HeightBasedSubscription is a subscription that retrieves data sequentially by block height
func NewHeightBasedSubscription ¶ added in v0.30.2
func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription
type ParsedEvent ¶ added in v0.30.2
type ParsedEvent struct {
	Type         ParsedEventType
	EventType    flow.EventType
	Address      string
	Contract     string
	ContractName string
	Name         string
}
    func ParseEvent ¶ added in v0.30.2
func ParseEvent(eventType flow.EventType) (*ParsedEvent, error)
ParseEvent parses an event type into its parts. There are 2 valid EventType formats: - flow.[EventName] - A.[Address].[Contract].[EventName] Any other format results in an error.
type ParsedEventType ¶ added in v0.30.2
type ParsedEventType int
const ( ProtocolEventType ParsedEventType = iota + 1 AccountEventType )
type StateStreamBackend ¶
type StateStreamBackend struct {
	ExecutionDataBackend
	EventsBackend
	// contains filtered or unexported fields
}
    func New ¶
func New( log zerolog.Logger, config Config, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, execDataStore execution_data.ExecutionDataStore, execDataCache *herocache.Cache, broadcaster *engine.Broadcaster, ) (*StateStreamBackend, error)
type Streamable ¶ added in v0.30.2
type Streamable interface {
	ID() string
	Close()
	Fail(error)
	Send(context.Context, interface{}, time.Duration) error
	Next(context.Context) (interface{}, error)
}
    Streamable represents a subscription that can be streamed.
type Streamer ¶ added in v0.30.2
type Streamer struct {
	// contains filtered or unexported fields
}
    Streamer
func NewStreamer ¶ added in v0.30.2
func NewStreamer( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, sub Streamable, ) *Streamer
type Subscription ¶ added in v0.30.2
type Subscription interface {
	// ID returns the unique identifier for this subscription used for logging
	ID() string
	// Channel returns the channel from which subscriptino data can be read
	Channel() <-chan interface{}
	// Err returns the error that caused the subscription to fail
	Err() error
}
    Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.
type SubscriptionImpl ¶ added in v0.30.2
type SubscriptionImpl struct {
	// contains filtered or unexported fields
}
    func NewSubscription ¶ added in v0.30.2
func NewSubscription(bufferSize int) *SubscriptionImpl
func (*SubscriptionImpl) Channel ¶ added in v0.30.2
func (sub *SubscriptionImpl) Channel() <-chan interface{}
Channel returns the channel from which subscriptino data can be read
func (*SubscriptionImpl) Close ¶ added in v0.30.2
func (sub *SubscriptionImpl) Close()
Close is called when a subscription ends gracefully, and closes the subscription channel
func (*SubscriptionImpl) Err ¶ added in v0.30.2
func (sub *SubscriptionImpl) Err() error
Err returns the error that caused the subscription to fail
func (*SubscriptionImpl) Fail ¶ added in v0.30.2
func (sub *SubscriptionImpl) Fail(err error)
Fail registers an error and closes the subscription channel
func (*SubscriptionImpl) ID ¶ added in v0.30.2
func (sub *SubscriptionImpl) ID() string
ID returns the subscription ID Note: this is not a cryptographic hash