relaycore

package
v5.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2025 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CacheMaxCost     = 20000 // each item cost would be 1
	CacheNumCounters = 20000 // expect 2000 items
	EntryTTL         = 5 * time.Minute
)
View Source
const (
	MaxCallsPerRelay = 50
)

Variables

View Source
var (
	RelayRetriesManagerInstance = lavaprotocol.NewRelayRetriesManager()
	RelayProcessorMetrics       = &RelayProcessorMetricsMock{}
)
View Source
var RelayCountOnNodeError = 2

Functions

func SendNodeError

func SendNodeError(relayProcessor *RelayProcessor, provider string, delay time.Duration)

func SendNodeErrorJsonRpc

func SendNodeErrorJsonRpc(relayProcessor *RelayProcessor, provider string, delay time.Duration)

func SendProtocolError

func SendProtocolError(relayProcessor *RelayProcessor, provider string, delay time.Duration, err error)

func SendSuccessResp

func SendSuccessResp(relayProcessor *RelayProcessor, provider string, delay time.Duration)

func SendSuccessRespJsonRpc

func SendSuccessRespJsonRpc(relayProcessor *RelayProcessor, provider string, delay time.Duration)

func UpgradeToArchiveIfNeeded

func UpgradeToArchiveIfNeeded(ctx context.Context, protocolMessage chainlib.ProtocolMessage, archiveStatus *ArchiveStatus, relayParser RelayParserInf, cache RetryHashCacheInf, numberOfRetriesLaunched int, numberOfNodeErrors uint64) chainlib.ProtocolMessage

Static function to determine if archive upgrade is needed and return the appropriate protocol message This doesn't require a RelayState object, avoiding the need to create it twice

Types

type ArchiveStatus

type ArchiveStatus struct {
	// contains filtered or unexported fields
}

func (*ArchiveStatus) Copy

func (as *ArchiveStatus) Copy() *ArchiveStatus

type ChainIdAndApiInterfaceGetter

type ChainIdAndApiInterfaceGetter interface {
	GetChainIdAndApiInterface() (string, string)
}

ChainIdAndApiInterfaceGetter interface

type Consistency

type Consistency interface {
	SetSeenBlock(blockSeen int64, userData common.UserData)
	GetSeenBlock(userData common.UserData) (int64, bool)
	SetSeenBlockFromKey(blockSeen int64, key string)
	Key(userData common.UserData) string
}

Consistency interface for managing block consistency

func NewConsistency

func NewConsistency(specId string) Consistency

type ConsistencyImpl

type ConsistencyImpl struct {
	// contains filtered or unexported fields
}

ConsistencyImpl is the default implementation of Consistency

func (*ConsistencyImpl) GetLatestBlock

func (cc *ConsistencyImpl) GetLatestBlock(key string) (block int64, found bool)

func (*ConsistencyImpl) GetSeenBlock

func (cc *ConsistencyImpl) GetSeenBlock(userData common.UserData) (int64, bool)

func (*ConsistencyImpl) Key

func (cc *ConsistencyImpl) Key(userData common.UserData) string

func (*ConsistencyImpl) SetLatestBlock

func (cc *ConsistencyImpl) SetLatestBlock(key string, block int64)

func (*ConsistencyImpl) SetSeenBlock

func (cc *ConsistencyImpl) SetSeenBlock(blockSeen int64, userData common.UserData)

func (*ConsistencyImpl) SetSeenBlockFromKey

func (cc *ConsistencyImpl) SetSeenBlockFromKey(blockSeen int64, key string)

used on subscription, where we already have the dapp key stored, but we don't keep the dappId and ip separately

type MetricsInterface

type MetricsInterface interface {
	SetRelayNodeErrorMetric(providerAddress string, chainId string, apiInterface string)
	SetNodeErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string)
	SetProtocolErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string)
}

MetricsInterface for relay processor metrics

type QoSAvailabilityDegrader

type QoSAvailabilityDegrader interface {
	DegradeAvailability(epoch uint64, sessionId int64)
}

QoSAvailabilityDegrader interface for QoS management

type RelayError

type RelayError struct {
	Err          error
	ProviderInfo common.ProviderInfo
	Response     *RelayResponse
}

TODO: there's no need to save error twice and provider info twice, this can just be a RelayResponse

func (RelayError) GetError

func (re RelayError) GetError() error

GetError returns the underlying error

func (RelayError) String

func (re RelayError) String() string

type RelayErrors

type RelayErrors struct {
	RelayErrors       []RelayError
	OnFailureMergeAll bool
}

func (*RelayErrors) AddError

func (r *RelayErrors) AddError(err RelayError)

AddError adds a new error to the RelayErrors collection

func (*RelayErrors) GetBestErrorMessageForUser

func (r *RelayErrors) GetBestErrorMessageForUser() RelayError

type RelayParserInf

type RelayParserInf interface {
	ParseRelay(
		ctx context.Context,
		url string,
		req string,
		connectionType string,
		dappID string,
		consumerIp string,
		metadata []pairingtypes.Metadata,
	) (protocolMessage chainlib.ProtocolMessage, err error)
}

type RelayProcessor

type RelayProcessor struct {
	ResultsManager
	RelayStateMachine
	// contains filtered or unexported fields
}

func NewRelayProcessor

func NewRelayProcessor(
	ctx context.Context,
	quorumParams common.QuorumParams,
	consistency Consistency,
	metricsInf MetricsInterface,
	chainIdAndApiInterfaceGetter ChainIdAndApiInterfaceGetter,
	relayRetriesManager *lavaprotocol.RelayRetriesManager,
	relayStateMachine RelayStateMachine,
	availabilityDegrader QoSAvailabilityDegrader,
) *RelayProcessor

func (*RelayProcessor) GetAllowSessionDegradation

func (rp *RelayProcessor) GetAllowSessionDegradation() bool

true if we never got an extension. (default value)

func (*RelayProcessor) GetQuorumParams

func (rp *RelayProcessor) GetQuorumParams() common.QuorumParams

func (*RelayProcessor) GetSkipDataReliability

func (rp *RelayProcessor) GetSkipDataReliability() bool

func (*RelayProcessor) GetStatefulRelayTargets

func (rp *RelayProcessor) GetStatefulRelayTargets() []string

GetStatefulRelayTargets returns the list of providers that received a stateful relay

func (*RelayProcessor) GetUsedProviders

func (rp *RelayProcessor) GetUsedProviders() *lavasession.UsedProviders

func (*RelayProcessor) HasRequiredNodeResults

func (rp *RelayProcessor) HasRequiredNodeResults(tries int) (bool, int)

func (*RelayProcessor) HasUnsupportedMethodErrors

func (rp *RelayProcessor) HasUnsupportedMethodErrors() bool

HasUnsupportedMethodErrors checks if any of the current errors are unsupported method errors

func (*RelayProcessor) NodeResults

func (rp *RelayProcessor) NodeResults() []common.RelayResult

this function returns all results that came from a node, meaning success, and node errors

func (*RelayProcessor) ProcessingResult

func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult, processingError error)

this function returns the results according to the defined strategy results were stored in WaitForResults and now there's logic to select which results are returned to the user will return an error if we did not meet quota of replies, if we did we follow the strategies: if return strategy == get_first: return the first success, if none: get best node error if strategy == quorum get majority of node responses on error: we will return a placeholder relayResult, with a provider address and a status code

func (*RelayProcessor) SetDisallowDegradation

func (rp *RelayProcessor) SetDisallowDegradation()

in case we had an extension and managed to get a session successfully, we prevent session degradation.

func (*RelayProcessor) SetResponse

func (rp *RelayProcessor) SetResponse(response *RelayResponse)

func (*RelayProcessor) SetSkipDataReliability

func (rp *RelayProcessor) SetSkipDataReliability(val bool)

func (*RelayProcessor) SetStatefulRelayTargets

func (rp *RelayProcessor) SetStatefulRelayTargets(providers []string)

SetStatefulRelayTargets stores the list of providers that received a stateful relay

func (*RelayProcessor) String

func (rp *RelayProcessor) String() string

func (*RelayProcessor) WaitForResults

func (rp *RelayProcessor) WaitForResults(ctx context.Context) error

this function waits for the processing results, they are written by multiple go routines and read by this go routine it then updates the responses in their respective place, node errors, protocol errors or success results

type RelayProcessorMetricsMock

type RelayProcessorMetricsMock struct{}

func (*RelayProcessorMetricsMock) GetChainIdAndApiInterface

func (romm *RelayProcessorMetricsMock) GetChainIdAndApiInterface() (string, string)

func (*RelayProcessorMetricsMock) SetNodeErrorRecoveredSuccessfullyMetric

func (romm *RelayProcessorMetricsMock) SetNodeErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string)

func (*RelayProcessorMetricsMock) SetProtocolErrorRecoveredSuccessfullyMetric

func (romm *RelayProcessorMetricsMock) SetProtocolErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string)

func (*RelayProcessorMetricsMock) SetRelayNodeErrorMetric

func (romm *RelayProcessorMetricsMock) SetRelayNodeErrorMetric(providerAddress, chainId, apiInterface string)

func (*RelayProcessorMetricsMock) SetRelaySentByNewBatchTickerMetric

func (romm *RelayProcessorMetricsMock) SetRelaySentByNewBatchTickerMetric(chainId string, apiInterface string)

type RelayResponse

type RelayResponse struct {
	RelayResult common.RelayResult
	Err         error
}

RelayResponse represents a response from a relay operation

type RelayState

type RelayState struct {
	// contains filtered or unexported fields
}

func GetEmptyRelayState

func GetEmptyRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage) *RelayState

func NewRelayState

func NewRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage, stateNumber int, cache RetryHashCacheInf, relayParser RelayParserInf, archiveStatus *ArchiveStatus) *RelayState

func (*RelayState) CheckIsArchive

func (rs *RelayState) CheckIsArchive(relayRequestData *pairingtypes.RelayPrivateData) bool

func (*RelayState) GetArchiveStatus

func (rs *RelayState) GetArchiveStatus() *ArchiveStatus

func (*RelayState) GetIsArchive

func (rs *RelayState) GetIsArchive() bool

func (*RelayState) GetIsEarliestUsed

func (rs *RelayState) GetIsEarliestUsed() bool

func (*RelayState) GetIsUpgraded

func (rs *RelayState) GetIsUpgraded() bool

func (*RelayState) GetProtocolMessage

func (rs *RelayState) GetProtocolMessage() chainlib.ProtocolMessage

func (*RelayState) GetStateNumber

func (rs *RelayState) GetStateNumber() int

func (*RelayState) SetIsArchive

func (rs *RelayState) SetIsArchive(isArchive bool)

func (*RelayState) SetIsEarliestUsed

func (rs *RelayState) SetIsEarliestUsed()

func (*RelayState) SetProtocolMessage

func (rs *RelayState) SetProtocolMessage(protocolMessage chainlib.ProtocolMessage)

func (*RelayState) UpgradeToArchiveIfNeeded

func (rs *RelayState) UpgradeToArchiveIfNeeded(numberOfRetriesLaunched int, numberOfNodeErrors uint64)

Legacy method wrapper for backward compatibility

type RelayStateMachine

type RelayStateMachine interface {
	GetProtocolMessage() chainlib.ProtocolMessage
	GetDebugState() bool
	GetRelayTaskChannel() (chan RelayStateSendInstructions, error)
	UpdateBatch(err error)
	GetSelection() Selection
	GetUsedProviders() *lavasession.UsedProviders
	SetResultsChecker(resultsChecker ResultsCheckerInf)
	SetRelayRetriesManager(relayRetriesManager *lavaprotocol.RelayRetriesManager)
}

RelayStateMachine interface for managing relay state

type RelayStateSendInstructions

type RelayStateSendInstructions struct {
	Analytics      *metrics.RelayMetrics
	Err            error
	Done           bool
	RelayState     *RelayState
	NumOfProviders int
}

RelayStateSendInstructions struct for relay instructions

func (*RelayStateSendInstructions) IsDone

func (rssi *RelayStateSendInstructions) IsDone() bool

type ResultsCheckerInf

type ResultsCheckerInf interface {
	WaitForResults(ctx context.Context) error
	HasRequiredNodeResults(tries int) (bool, int)
	GetQuorumParams() common.QuorumParams
}

ResultsCheckerInf interface for checking results

type ResultsManager

type ResultsManager interface {
	String() string
	NodeResults() []common.RelayResult
	RequiredResults(requiredSuccesses int, selection Selection) bool
	ProtocolErrors() uint64
	HasResults() bool
	GetResults() (success int, nodeErrors int, specialNodeErrors int, protocolErrors int)
	GetResultsData() (successResults []common.RelayResult, nodeErrors []common.RelayResult, protocolErrors []RelayError)
	SetResponse(response *RelayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error)
	GetBestNodeErrorMessageForUser() RelayError
	GetBestProtocolErrorMessageForUser() RelayError
	NodeErrors() (ret []common.RelayResult)
}

func NewResultsManager

func NewResultsManager(guid uint64) ResultsManager

type ResultsManagerInst

type ResultsManagerInst struct {
	// contains filtered or unexported fields
}

func (*ResultsManagerInst) GetBestNodeErrorMessageForUser

func (rp *ResultsManagerInst) GetBestNodeErrorMessageForUser() RelayError

func (*ResultsManagerInst) GetBestProtocolErrorMessageForUser

func (rp *ResultsManagerInst) GetBestProtocolErrorMessageForUser() RelayError

func (*ResultsManagerInst) GetResults

func (rm *ResultsManagerInst) GetResults() (success int, nodeErrors int, specialNodeErrors int, protocolErrors int)

func (*ResultsManagerInst) GetResultsData

func (rp *ResultsManagerInst) GetResultsData() (successResults []common.RelayResult, nodeErrors []common.RelayResult, protocolErrors []RelayError)

func (*ResultsManagerInst) HasResults

func (rp *ResultsManagerInst) HasResults() bool

this function defines if we should use the manager to return the result (meaning it has some insight and responses) or just return to the user

func (*ResultsManagerInst) NodeErrors

func (rp *ResultsManagerInst) NodeErrors() (ret []common.RelayResult)

only when locked

func (*ResultsManagerInst) NodeResults

func (rp *ResultsManagerInst) NodeResults() []common.RelayResult

this function returns all results that came from a node, meaning success, and node errors

func (*ResultsManagerInst) ProtocolErrors

func (rp *ResultsManagerInst) ProtocolErrors() uint64

func (*ResultsManagerInst) RequiredResults

func (rp *ResultsManagerInst) RequiredResults(requiredSuccesses int, selection Selection) bool

func (*ResultsManagerInst) SetResponse

func (rp *ResultsManagerInst) SetResponse(response *RelayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error)

func (*ResultsManagerInst) String

func (rm *ResultsManagerInst) String() string

type RetryHashCacheInf

type RetryHashCacheInf interface {
	CheckHashInCache(hash string) bool
	AddHashToCache(hash string)
}

type Selection

type Selection int
const (
	Quorum     Selection = iota // get the majority out of requiredSuccesses
	BestResult                  // get the best result, even if it means waiting
)

selection Enum, do not add other const

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL