subscription

package
v0.46.2-util-print-chu... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: AGPL-3.0 Imports: 11 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// DefaultSendBufferSize is the default buffer size for the subscription's send channel.
	// The size is chosen to balance memory overhead from each subscription with performance when
	// streaming existing data.
	DefaultSendBufferSize = 10

	// DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time.
	DefaultMaxGlobalStreams = 1000

	// DefaultCacheSize defines the default max number of objects for the execution data cache.
	DefaultCacheSize = 100

	// DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout
	// expires, the connection is closed.
	DefaultSendTimeout = 30 * time.Second

	// DefaultResponseLimit is default max responses per second allowed on a stream. After exceeding
	// the limit, the stream is paused until more capacity is available.
	DefaultResponseLimit = float64(0)

	// DefaultHeartbeatInterval specifies the block interval at which heartbeat messages should be sent.
	DefaultHeartbeatInterval = 1
)

Variables

View Source
var ErrBlockNotReady = errors.New("block not ready")

ErrBlockNotReady represents an error indicating that a block is not yet available or ready.

View Source
var ErrEndOfData = errors.New("end of data")

ErrEndOfData represents an error indicating that no more data available for streaming.

Functions

func HandleResponse added in v0.38.0

func HandleResponse[T any](send chan<- any, transform func(resp T) (any, error)) func(resp T) error

HandleResponse processes a generic response of type and sends it to the provided channel.

Parameters: - send: The channel to which the processed response is sent. - transform: A function to transform the response into the expected interface{} type.

No errors are expected during normal operations.

func HandleSubscription

func HandleSubscription[T any](sub Subscription, handleResponse func(resp T) error) error

HandleSubscription is a generic handler for subscriptions to a specific type. It continuously listens to the subscription channel, handles the received responses, and sends the processed information to the client via the provided stream using handleResponse.

Parameters: - sub: The subscription. - handleResponse: The function responsible for handling the response of the subscribed type.

No errors are expected during normal operations.

Types

type GetDataByHeightFunc

type GetDataByHeightFunc func(ctx context.Context, height uint64) (any, error)

GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions

type HeightBasedSubscription

type HeightBasedSubscription struct {
	*SubscriptionImpl
	// contains filtered or unexported fields
}

HeightBasedSubscription is a subscription that retrieves data sequentially by block height

func NewHeightBasedSubscription

func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription

func (*HeightBasedSubscription) Next

Next returns the value for the next height from the subscription

type Streamable

type Streamable interface {
	// ID returns the subscription ID
	// Note: this is not a cryptographic hash
	ID() string
	// Close is called when a subscription ends gracefully, and closes the subscription channel
	Close()
	// Fail registers an error and closes the subscription channel
	Fail(error)
	// Send sends a value to the subscription channel or returns an error
	// Expected errors:
	// - context.DeadlineExceeded if send timed out
	// - context.Canceled if the client disconnected
	Send(context.Context, any, time.Duration) error
	// Next returns the value for the next height from the subscription
	Next(context.Context) (any, error)
}

Streamable represents a subscription that can be streamed.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer represents a streaming subscription that delivers data to clients.

func NewStreamer

func NewStreamer(
	log zerolog.Logger,
	broadcaster *engine.Broadcaster,
	sendTimeout time.Duration,
	limit float64,
	sub Streamable,
) *Streamer

NewStreamer creates a new Streamer instance.

func (*Streamer) Stream

func (s *Streamer) Stream(ctx context.Context)

Stream is a blocking method that streams data to the subscription until either the context is cancelled or it encounters an error. This function follows a somewhat unintuitive contract: if the context is canceled, it is treated as an error and written to the subscription. However, you can rely on this behavior in the subscription to handle it as a graceful shutdown.

type StreamingData

type StreamingData struct {
	MaxStreams  int32
	StreamCount atomic.Int32
}

StreamingData represents common streaming data configuration for access and state_stream handlers.

func NewStreamingData

func NewStreamingData(maxStreams uint32) StreamingData

type Subscription

type Subscription interface {
	// ID returns the unique identifier for this subscription used for logging
	ID() string

	// Channel returns the channel from which subscription data can be read
	Channel() <-chan any

	// Err returns the error that caused the subscription to fail
	Err() error
}

Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.

type SubscriptionHandler

type SubscriptionHandler struct {
	// contains filtered or unexported fields
}

SubscriptionHandler represents common streaming data configuration for creating streaming subscription.

func NewSubscriptionHandler

func NewSubscriptionHandler(
	log zerolog.Logger,
	broadcaster *engine.Broadcaster,
	sendTimeout time.Duration,
	responseLimit float64,
	sendBufferSize uint,
) *SubscriptionHandler

NewSubscriptionHandler creates a new SubscriptionHandler instance.

Parameters: - log: The logger to use for logging. - broadcaster: The engine broadcaster for publishing notifications. - sendTimeout: The duration after which a send operation will timeout. - responseLimit: The maximum allowed response time for a single stream. - sendBufferSize: The size of the response buffer for sending messages to the client.

Returns a new SubscriptionHandler instance.

func (*SubscriptionHandler) Subscribe

func (h *SubscriptionHandler) Subscribe(
	ctx context.Context,
	startHeight uint64,
	getData GetDataByHeightFunc,
) Subscription

Subscribe creates and starts a new subscription.

Parameters: - ctx: The context for the operation. - startHeight: The height to start subscription from. - getData: The function to retrieve data by height.

type SubscriptionImpl

type SubscriptionImpl struct {
	// contains filtered or unexported fields
}

func NewFailedSubscription

func NewFailedSubscription(err error, msg string) *SubscriptionImpl

NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.

func NewSubscription

func NewSubscription(bufferSize int) *SubscriptionImpl

func (*SubscriptionImpl) Channel

func (sub *SubscriptionImpl) Channel() <-chan any

Channel returns the channel from which subscription data can be read

func (*SubscriptionImpl) Close

func (sub *SubscriptionImpl) Close()

Close is called when a subscription ends gracefully, and closes the subscription channel

func (*SubscriptionImpl) Err

func (sub *SubscriptionImpl) Err() error

Err returns the error that caused the subscription to fail

func (*SubscriptionImpl) Fail

func (sub *SubscriptionImpl) Fail(err error)

Fail registers an error and closes the subscription channel

func (*SubscriptionImpl) ID

func (sub *SubscriptionImpl) ID() string

ID returns the subscription ID Note: this is not a cryptographic hash

func (*SubscriptionImpl) Send

func (sub *SubscriptionImpl) Send(ctx context.Context, v any, timeout time.Duration) error

Send sends a value to the subscription channel or returns an error Expected errors: - context.DeadlineExceeded if send timed out - context.Canceled if the client disconnected

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL