connection

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2025 License: Apache-2.0, MIT Imports: 27 Imported by: 0

README

Connection Package

This package provides connection management for different network protocols (RPC, gRPC) used in the Locust project.

Recent Changes

The gRPC functionality previously in the separate grpc package has been consolidated into this package to:

  1. Reduce code duplication
  2. Provide a unified interface for all connection types
  3. Simplify maintenance of connection management logic
  4. Standardize how we handle connection failover and health checking

Components

RPC Clients
  • MultiEndpointRPCClient: Manages multiple RPC endpoints with automatic failover
  • InitRPCClient: Creates a single RPC client to a specific endpoint
gRPC Clients
  • MultiEndpointGRPCClient: Manages multiple gRPC endpoints with automatic failover
  • SetupGRPCConnection: Creates a single gRPC connection to a specific endpoint
Message Management
  • MessageSender: Interface for sending messages to the blockchain
  • DefaultMessageSender: Default implementation of the MessageSender interface
  • Various helper functions for broadcasting transactions with retry logic
Client Registry
  • ClientRegistry: Centralized registry of Cosmos clients for different chains
  • Methods for initializing, retrieving, and managing client instances

Migration from grpc package

If you were previously using the grpc package directly, you should update your imports to use the connection package instead:

// Before
import "github.com/margined-protocol/locust-core/pkg/grpc"

// After
import "github.com/margined-protocol/locust-core/pkg/connection"

Function and type names remain the same, so code should work with minimal changes beyond the import path update.

Documentation

Index

Constants

View Source
const (
	// Default timeout for gRPC health checks
	DefaultGRPCHealthCheckInterval = 30 * time.Second
	// Default timeout for gRPC connection attempts
	DefaultGRPCConnectionTimeout = 10 * time.Second
)
View Source
const (
	// Default timeout for RPC health checks
	DefaultRPCHealthCheckInterval = 30 * time.Second
	// Default timeout for RPC connection attempts
	DefaultRPCConnectionTimeout = 10 * time.Second
)

Variables

This section is empty.

Functions

func BroadcastShortTermOrder

func BroadcastShortTermOrder(ctx context.Context, l *zap.Logger, cosmosClient *cosmosclient.Client, account cosmosaccount.Account, msgs ...sdk.Msg) error

BroadcastShortTermOrder broadcasts a transaction and checks for immediate errors, specifically designed for dYdX short-term orders which don't get included in blocks

func BroadcastWithRetry

func BroadcastWithRetry(ctx context.Context, l *zap.Logger, cosmosClient *cosmosclient.Client, account cosmosaccount.Account, cfg *types.Config, msgs ...sdk.Msg) error

func BroadcastWithRetryAndResponse

func BroadcastWithRetryAndResponse(ctx context.Context, l *zap.Logger, cosmosClient *cosmosclient.Client, account cosmosaccount.Account, cfg *types.Config, msgs ...sdk.Msg) (*cosmosclient.Response, error)

func GetBlockHeight

func GetBlockHeight(ctx context.Context, l *zap.Logger, rpcServerAddress, prefix string) (*int64, error)

func GetGranteeAccount

func GetGranteeAccount(c *cosmosclient.Client, signer, prefix string) (cosmosaccount.Account, error)

func GetGranteeAddress

func GetGranteeAddress(c *cosmosclient.Client, signer, prefix string) (string, error)

func GetSignerAccountAndAddress

func GetSignerAccountAndAddress(c *cosmosclient.Client, signer, prefix string) (*cosmosaccount.Account, string, error)

Define a function to get the signer account and address

func GetTendermintClient

func GetTendermintClient(l *zap.Logger, rpchttp *http.HTTP) cometbft.CometRPC

func InitCosmosClient

func InitCosmosClient(ctx context.Context, l *zap.Logger, chain *types.Chain, key *types.SigningKey) (*cosmosclient.Client, error)

InitCosmosClient initializes a Cosmos client with retry logic

func InitCosmosQueryClient

func InitCosmosQueryClient(ctx context.Context, l *zap.Logger, serverAddress, addressPrefix string) (*cosmosclient.Client, error)

InitCosmosQueryClient initialises a cosmosclient for querying which is lightweight and can be disposed of after use.

func InitFeeClient

func InitFeeClient(ctx context.Context, l *zap.Logger, chain *types.Chain, key *types.SigningKey) (*cosmosclient.Client, error)

InitFeeClient initialises a cosmosclient for executing transactions.

func InitRPCClient

func InitRPCClient(logger *zap.Logger, serverAddress, websocketPath string, apiToken string) (*rpchttp.HTTP, cometbft.CometRPC, error)

InitRPCClient initialises a RPC client with websocket support. It returns an error if websocket connection fails.

func SendMessages

func SendMessages(
	ctx context.Context, l *zap.Logger,
	clientRegistry *ClientRegistry,
	chainMsg ChainMessage,
	messageSender MessageSender,
	cfg *types.Config,
	isDryRun, isFeeClient, wrapAuthz bool,
) (*cosmosclient.Response, error)

func SetupGRPCConnection added in v0.6.1

func SetupGRPCConnection(address string, useTLS bool, apiToken string) (*grpc.ClientConn, error)

SetupGRPCConnection establishes a GRPC connection, optionally using system's TLS certificates

Types

type ChainMessage

type ChainMessage struct {
	ChainID     string    // Chain ID where this message should be executed
	Messages    []sdk.Msg // Messages to be executed on the source chain
	IsFeeClient bool      // Whether to use the fee client
	WrapAuthz   bool      // Whether to wrap the messages in an Authz MsgExec
}

ChainMessage wraps a message with its source chain information for proper routing

func NewAuthzChainMsg

func NewAuthzChainMsg(chainID string, msgs []sdk.Msg) ChainMessage

NewAuthzChainMsg creates a new ChainMessage with WrapAuthz set to true

func NewChainMsg

func NewChainMsg(chainID string, msgs []sdk.Msg) ChainMessage

NewChainMsg creates a new ChainMessage with WrapAuthz set to false

type ClientEntry

type ClientEntry struct {
	Chain      *types.Chain
	Key        *types.SigningKey
	GRPCClient *MultiEndpointGRPCClient
	RPCClient  *MultiEndpointRPCClient
}

ClientEntry represents a registered chain with its client and configuration

type ClientInstance

type ClientInstance struct {
	Client     *cosmosclient.Client
	Chain      *types.Chain
	Key        *types.SigningKey
	GRPCClient *MultiEndpointGRPCClient
	RPCClient  *MultiEndpointRPCClient
}

ClientInstance represents a registered chain with its client and configuration

type ClientRegistry

type ClientRegistry struct {
	// contains filtered or unexported fields
}

ClientRegistry manages connections to multiple chains

func NewClientRegistry

func NewClientRegistry(ctx context.Context, logger *zap.Logger, signerAccount string) *ClientRegistry

NewClientRegistry creates a new chain registry

func (*ClientRegistry) Close

func (r *ClientRegistry) Close()

Close closes all clients

func (*ClientRegistry) GetBalance

func (r *ClientRegistry) GetBalance(ctx context.Context, chainID, signerAccount, denom string) (*sdkmath.Int, error)

GetBalance retrieves the balance of an account on a chain by its ID

func (*ClientRegistry) GetClient

func (r *ClientRegistry) GetClient(chainID string, isFeeClient bool) (*ClientInstance, error)

GetClient retrieves a chain client entry by its ID

func (*ClientRegistry) GetGRPCClient added in v0.6.1

func (r *ClientRegistry) GetGRPCClient(chainID string) (*MultiEndpointGRPCClient, error)

GetGRPCClient returns the multi-endpoint gRPC client for a chain

func (*ClientRegistry) GetHeight

func (r *ClientRegistry) GetHeight(ctx context.Context, chainID string) (*int64, error)

GetHeight retrieves the blockheight of a chain by its ID

func (*ClientRegistry) GetRPCClient added in v0.6.1

func (r *ClientRegistry) GetRPCClient(chainID string) (*MultiEndpointRPCClient, error)

GetRPCClient returns the multi-endpoint RPC client for a chain

func (*ClientRegistry) GetSignerAccountAndAddress

func (r *ClientRegistry) GetSignerAccountAndAddress(signerAccount, chainID string) (*cosmosaccount.Account, string, error)

GetSignerAccountAndAddress retrieves the account and address for a specific chain

func (*ClientRegistry) HasClient

func (r *ClientRegistry) HasClient(chainID string) bool

HasClient checks if a client is registered

func (*ClientRegistry) RegisterClient

func (r *ClientRegistry) RegisterClient(chain *types.Chain, key *types.SigningKey) error

RegisterClient adds a new chain client to the registry

type DefaultMessageSender

type DefaultMessageSender struct{}

DefaultMessageSender is a default implementation of the MessageSender interface

func (*DefaultMessageSender) SendAuthzMessages

func (*DefaultMessageSender) SendAuthzMessages(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) error

SendAuthzMessages sends the given messages using the cosmos client and grantee information. It returns an error if any step fails.

func (*DefaultMessageSender) SendAuthzMessagesWithResponse

func (*DefaultMessageSender) SendAuthzMessagesWithResponse(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) (*cosmosclient.Response, error)

SendAuthzMessagesWithResponse sends the given messages using the cosmos client and grantee information. It returns an error if any step fails.

func (*DefaultMessageSender) SendMessages

func (*DefaultMessageSender) SendMessages(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) error

SendMessages sends the given messages using the cosmos client and grantee information. It returns an error if any step fails.

func (*DefaultMessageSender) SendMessagesWithResponse

func (*DefaultMessageSender) SendMessagesWithResponse(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) (*cosmosclient.Response, error)

SendMessages sends the given messages using the cosmos client and grantee information. It returns an error if any step fails.

func (*DefaultMessageSender) SendShortTermMessage

func (*DefaultMessageSender) SendShortTermMessage(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) error

SendMessages sends the given messages using the cosmos client and grantee information. It returns an error if any step fails.

type GRPCEndpointConfig added in v0.6.1

type GRPCEndpointConfig struct {
	Address string
	UseTLS  bool
	APIKey  string
}

GRPCEndpointConfig represents a gRPC endpoint with its TLS configuration

type MessageSender

type MessageSender interface {
	SendAuthzMessages(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) error
	SendAuthzMessagesWithResponse(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) (*cosmosclient.Response, error)
	SendMessages(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) error
	SendMessagesWithResponse(ctx context.Context, l *zap.Logger, c *cosmosclient.Client, cfg *types.Config, msgs ...sdk.Msg) (*cosmosclient.Response, error)
}

MessageSender defines an interface for sending messages using the cosmos client

type MultiEndpointGRPCClient added in v0.6.1

type MultiEndpointGRPCClient struct {
	// contains filtered or unexported fields
}

MultiEndpointGRPCClient manages multiple gRPC endpoints with automatic failover

func NewMultiEndpointGRPCClient added in v0.6.1

func NewMultiEndpointGRPCClient(
	ctx context.Context,
	logger *zap.Logger,
	endpoints []GRPCEndpointConfig,
) (*MultiEndpointGRPCClient, error)

NewMultiEndpointGRPCClient creates a new client that manages multiple gRPC endpoints

func NewMultiEndpointGRPCClientWithAddresses added in v0.6.1

func NewMultiEndpointGRPCClientWithAddresses(
	ctx context.Context,
	logger *zap.Logger,
	addresses []string,
	useTLS bool,
) (*MultiEndpointGRPCClient, error)

Helper function to create a client with addresses and uniform TLS setting

func (*MultiEndpointGRPCClient) Close added in v0.6.1

func (c *MultiEndpointGRPCClient) Close()

Close shuts down the client and all connections

func (*MultiEndpointGRPCClient) ForceRotate added in v0.6.1

func (c *MultiEndpointGRPCClient) ForceRotate() error

ForceRotate immediately rotates to the next endpoint

func (*MultiEndpointGRPCClient) GetClient added in v0.6.1

func (c *MultiEndpointGRPCClient) GetClient() *grpc.ClientConn

GetClient returns the current gRPC client connection

func (*MultiEndpointGRPCClient) GetCurrentEndpoint added in v0.6.1

func (c *MultiEndpointGRPCClient) GetCurrentEndpoint() (endpoint GRPCEndpointConfig, index int)

GetCurrentEndpoint returns information about the currently connected endpoint

func (*MultiEndpointGRPCClient) SetConnectionTimeout added in v0.6.1

func (c *MultiEndpointGRPCClient) SetConnectionTimeout(timeout time.Duration)

SetConnectionTimeout sets the timeout for connection attempts

func (*MultiEndpointGRPCClient) SetHealthCheckInterval added in v0.6.1

func (c *MultiEndpointGRPCClient) SetHealthCheckInterval(interval time.Duration)

SetHealthCheckInterval allows changing the interval between health checks

type MultiEndpointRPCClient added in v0.6.1

type MultiEndpointRPCClient struct {
	// contains filtered or unexported fields
}

MultiEndpointRPCClient manages multiple RPC endpoints with automatic failover

func NewMultiEndpointRPCClient added in v0.6.1

func NewMultiEndpointRPCClient(
	ctx context.Context,
	logger *zap.Logger,
	endpoints []RPCEndpointConfig,
) (*MultiEndpointRPCClient, error)

NewMultiEndpointRPCClient creates a new client that manages multiple RPC endpoints

func NewMultiEndpointRPCClientWithAddresses added in v0.6.1

func NewMultiEndpointRPCClientWithAddresses(
	ctx context.Context,
	logger *zap.Logger,
	addresses []string,
	websocketPath string,
) (*MultiEndpointRPCClient, error)

Helper function to create a client with addresses and the same websocket path

func (*MultiEndpointRPCClient) Close added in v0.6.1

func (c *MultiEndpointRPCClient) Close()

Close shuts down the client and all connections

func (*MultiEndpointRPCClient) ForceRotate added in v0.6.1

func (c *MultiEndpointRPCClient) ForceRotate() error

ForceRotate immediately rotates to the next endpoint

func (*MultiEndpointRPCClient) GetClient added in v0.6.1

func (c *MultiEndpointRPCClient) GetClient() *rpchttp.HTTP

GetClient returns the current RPC client

func (*MultiEndpointRPCClient) GetCometClient added in v0.6.1

func (c *MultiEndpointRPCClient) GetCometClient() cometbft.CometRPC

GetCometClient returns the current CometRPC client interface

func (*MultiEndpointRPCClient) GetCurrentEndpoint added in v0.6.1

func (c *MultiEndpointRPCClient) GetCurrentEndpoint() (endpoint RPCEndpointConfig, index int)

GetCurrentEndpoint returns information about the currently connected endpoint

func (*MultiEndpointRPCClient) SetConnectionTimeout added in v0.6.1

func (c *MultiEndpointRPCClient) SetConnectionTimeout(timeout time.Duration)

SetConnectionTimeout sets the timeout for connection attempts

func (*MultiEndpointRPCClient) SetHealthCheckInterval added in v0.6.1

func (c *MultiEndpointRPCClient) SetHealthCheckInterval(interval time.Duration)

SetHealthCheckInterval allows changing the interval between health checks

type RPCEndpointConfig added in v0.6.1

type RPCEndpointConfig struct {
	Address       string
	WebsocketPath string
	APIKey        string
}

RPCEndpointConfig represents an RPC endpoint configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL