client

package
v1.16.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 17 Imported by: 34

Documentation

Index

Constants

View Source
const (
	DefaultTimeoutSeconds = 30
)

Variables

View Source
var ErrNoEndpoint = errors.New("no endpoint is configured")
View Source
var ErrSubscriberClosed = errors.New("subscriber closed")

Functions

func CheckAndDial

func CheckAndDial(ctx context.Context, log log.Logger, addr string, connectTimeout time.Duration, options ...rpc.ClientOption) (*rpc.Client, error)

func IsURLAvailable

func IsURLAvailable(ctx context.Context, address string, timeout time.Duration) bool

func ProbeWS

func ProbeWS(ctx context.Context, url string) error

ProbeWS performs a lightweight websocket handshake against the given URL and closes immediately. It can be used in readiness checks to verify that the endpoint accepts websocket connections.

Types

type BaseRPCClient

type BaseRPCClient struct {
	// contains filtered or unexported fields
}

BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant with the client.RPC interface. It sets a default timeout of 10s on CallContext & 20s on BatchCallContext made through it.

func (*BaseRPCClient) BatchCallContext

func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error

func (*BaseRPCClient) CallContext

func (b *BaseRPCClient) CallContext(ctx context.Context, result any, method string, args ...any) error

func (*BaseRPCClient) Close

func (b *BaseRPCClient) Close()

func (*BaseRPCClient) Subscribe

func (b *BaseRPCClient) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error)

type BasicHTTPClient

type BasicHTTPClient struct {
	// contains filtered or unexported fields
}

func NewBasicHTTPClient

func NewBasicHTTPClient(endpoint string, log log.Logger, opts ...BasicHTTPClientOption) *BasicHTTPClient

func (*BasicHTTPClient) Get

func (cl *BasicHTTPClient) Get(ctx context.Context, p string, query url.Values, headers http.Header) (*http.Response, error)

type BasicHTTPClientOption

type BasicHTTPClientOption interface {
	Apply(c *BasicHTTPClient)
}

func WithHeader

func WithHeader(h http.Header) BasicHTTPClientOption

type BasicHTTPClientOptionFn

type BasicHTTPClientOptionFn func(*BasicHTTPClient)

func (BasicHTTPClientOptionFn) Apply

type ErrorDataProvider

type ErrorDataProvider interface {
	ErrorData() interface{}
}

type HTTP

type HTTP interface {
	Get(ctx context.Context, path string, query url.Values, headers http.Header) (*http.Response, error)
}

type PollingClient

type PollingClient struct {
	// contains filtered or unexported fields
}

PollingClient is an RPC client that provides newHeads subscriptions via a polling loop. It's designed for HTTP endpoints, but WS will work too.

func NewPollingClient

func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...WrappedHTTPClientOption) *PollingClient

NewPollingClient returns a new PollingClient. Canceling the passed-in context will close the client. Callers are responsible for closing the client in order to prevent resource leaks.

func (*PollingClient) BatchCallContext

func (w *PollingClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error

func (*PollingClient) CallContext

func (w *PollingClient) CallContext(ctx context.Context, result any, method string, args ...any) error

func (*PollingClient) Close

func (w *PollingClient) Close()

Close closes the PollingClient and the underlying RPC client it talks to.

func (*PollingClient) Subscribe

func (w *PollingClient) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error)

Subscribe supports eth_subscribe of block headers, by creating a new newHeads subscription. It takes identical arguments to Geth's native Subscribe method. It will return an error, however, if the passed in channel is not a *types.Headers channel or the subscription type is not newHeads. Or if the namespace is not "eth".

type RPC

type RPC interface {
	Close()
	CallContext(ctx context.Context, result any, method string, args ...any) error
	BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
	Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error)
}

func NewBaseRPCClient

func NewBaseRPCClient(c *rpc.Client, opts ...RPCOption) RPC

func NewRPC

func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error)

NewRPC returns the correct client.RPC instance for a given RPC url.

func NewRPCWithClient

func NewRPCWithClient(ctx context.Context, lgr log.Logger, addr string, underlying RPC, pollInterval time.Duration) (RPC, error)

NewRPCWithClient builds a new polling client with the given underlying RPC client.

type RPCOption

type RPCOption func(cfg *rpcConfig)

func WithBatchCallTimeout

func WithBatchCallTimeout(d time.Duration) RPCOption

func WithCallTimeout

func WithCallTimeout(d time.Duration) RPCOption

func WithConnectTimeout

func WithConnectTimeout(d time.Duration) RPCOption

func WithDialAttempts

func WithDialAttempts(attempts int) RPCOption

WithDialAttempts configures the number of attempts for the initial dial to the RPC, attempts are executed with an exponential backoff strategy by default.

func WithFixedDialBackoff

func WithFixedDialBackoff(d time.Duration) RPCOption

WithFixedDialBackoff makes the RPC client use a fixed delay between dial attempts of 2 seconds instead of exponential

func WithGethRPCOptions

func WithGethRPCOptions(gethRPCOptions ...rpc.ClientOption) RPCOption

WithGethRPCOptions passes the list of go-ethereum RPC options to the internal RPC instance.

func WithHttpPollInterval

func WithHttpPollInterval(duration time.Duration) RPCOption

WithHttpPollInterval configures the RPC to poll at the given rate, in case RPC subscriptions are not available.

func WithLazyDial

func WithLazyDial() RPCOption

WithLazyDial makes the RPC client initialization defer the initial connection attempt, and defer to later RPC requests upon subsequent dial errors. Any dial-backoff option will be ignored if this option is used.

func WithRPCRecorder

func WithRPCRecorder(recorder rpc.Recorder) RPCOption

WithRPCRecorder makes the RPC client use the given RPC recorder. Warning: this overwrites any previous recorder choice.

func WithRateLimit

func WithRateLimit(rateLimit float64, burst int) RPCOption

WithRateLimit configures the RPC to target the given rate limit (in requests / second). See NewRateLimitingClient for more details.

type RateLimitingClient

type RateLimitingClient struct {
	// contains filtered or unexported fields
}

RateLimitingClient is a wrapper around a pure RPC that implements a global rate-limit on requests.

func NewRateLimitingClient

func NewRateLimitingClient(c RPC, limit rate.Limit, burst int) *RateLimitingClient

NewRateLimitingClient implements a global rate-limit for all RPC requests. A limit of N will ensure that over a long enough time-frame the given number of tokens per second is targeted. Burst limits how far off we can be from the target, by specifying how many requests are allowed at once.

func (*RateLimitingClient) BatchCallContext

func (b *RateLimitingClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error

func (*RateLimitingClient) CallContext

func (b *RateLimitingClient) CallContext(ctx context.Context, result any, method string, args ...any) error

func (*RateLimitingClient) Close

func (b *RateLimitingClient) Close()

func (*RateLimitingClient) Subscribe

func (b *RateLimitingClient) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error)

type WSClient

type WSClient struct {
	// contains filtered or unexported fields
}

WSClient is the canonical outbound websocket client for the monorepo. It wraps a coder/websocket connection, handles dialing with backoff, and exposes context-aware read/write helpers. New outbound websocket integrations should go through this type (or helpers built on top of it) rather than using websocket.Dial directly.

func DialWS

func DialWS(ctx context.Context, cfg WSConfig) (*WSClient, error)

DialWS establishes a websocket connection using the given configuration. It performs MaxAttempts connection attempts with the configured backoff.

func (*WSClient) Close

func (c *WSClient) Close(status websocket.StatusCode, reason string) error

Close closes the websocket connection with the given status and reason.

func (*WSClient) Read

func (c *WSClient) Read(ctx context.Context) (websocket.MessageType, []byte, error)

Read reads the next message from the websocket connection. If the context has no deadline, a default read timeout from the config is applied.

func (*WSClient) ReadAll

func (c *WSClient) ReadAll(ctx context.Context, logger log.Logger, duration time.Duration, output chan<- []byte, done chan<- struct{}) error

ReadAll streams all websocket messages for the given duration into the provided output channel. It closes the done channel (when provided) after finishing the read loop.

func (*WSClient) Write

func (c *WSClient) Write(ctx context.Context, msgType websocket.MessageType, data []byte) error

Write writes a message to the websocket connection. If the context has no deadline, a default write timeout from the config is applied.

type WSConfig

type WSConfig struct {
	// URL is the websocket endpoint, e.g. wss://example:8546/ws.
	URL string
	// Headers are optional HTTP headers included in the websocket handshake.
	Headers http.Header

	// DialTimeout bounds the initial websocket dial.
	DialTimeout time.Duration
	// ReadTimeout bounds individual Read calls when a context without deadline is used.
	ReadTimeout time.Duration
	// WriteTimeout bounds individual Write calls when a context without deadline is used.
	WriteTimeout time.Duration

	// MaxAttempts configures how many dial attempts are made with backoff.
	// Defaults to 1 if zero.
	MaxAttempts int
	// Backoff is the backoff strategy used between dial attempts.
	// Defaults to retry.Exponential() if nil.
	Backoff retry.Strategy

	// Log is used for connection level logging.
	// If nil, logging is disabled.
	Log log.Logger
}

WSConfig configures a websocket connection. This is the shared configuration type for all outbound websocket clients in the codebase. Higher level users can build additional behavior on top, but should prefer DialWS / WSClient instead of constructing websocket.Dial calls directly.

type WrappedHTTPClientOption

type WrappedHTTPClientOption func(w *PollingClient)

func WithPollRate

func WithPollRate(duration time.Duration) WrappedHTTPClientOption

WithPollRate specifies the rate at which the PollingClient will poll for new heads. Setting this to zero disables polling altogether, which is useful for testing.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL