flow

package
v0.0.0-...-da4b772 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const NoUpstream = "NoUpstream"

Variables

View Source
var ResultTrue = []byte(`true`)

Functions

This section is empty.

Types

type AvailabilityResponse

type AvailabilityResponse struct {
}

func (AvailabilityResponse) Cause

func (a AvailabilityResponse) Cause() string

func (AvailabilityResponse) Type

type BaseExecutionFlow

type BaseExecutionFlow struct {
	// contains filtered or unexported fields
}

func NewBaseExecutionFlow

func NewBaseExecutionFlow(
	chain chains.Chain,
	upstreamSupervisor upstreams.UpstreamSupervisor,
	cacheProcessor caches.CacheProcessor,
	registry *rating.RatingRegistry,
	appConfig *config.AppConfig,
	subCtx *SubCtx,
	quorumRegistry *quorum.Registry,
) *BaseExecutionFlow

func (*BaseExecutionFlow) AddHooks

func (e *BaseExecutionFlow) AddHooks(hooks ...any)

func (*BaseExecutionFlow) Execute

func (e *BaseExecutionFlow) Execute(ctx context.Context, requests []protocol.RequestHolder)

func (*BaseExecutionFlow) GetResponses

func (e *BaseExecutionFlow) GetResponses() chan *protocol.ResponseHolderWrapper

type BaseStrategy

type BaseStrategy struct {
	// contains filtered or unexported fields
}

func NewBaseStrategy

func NewBaseStrategy(chainSupervisor upstreams.ChainSupervisor) *BaseStrategy

func (*BaseStrategy) SelectUpstream

func (b *BaseStrategy) SelectUpstream(request protocol.RequestHolder) (string, error)

type EthBlockNumberIntegrityHandler

type EthBlockNumberIntegrityHandler struct {
}

func NewEthBlockNumberIntegrityHandler

func NewEthBlockNumberIntegrityHandler() *EthBlockNumberIntegrityHandler

func (*EthBlockNumberIntegrityHandler) CanBeProcessed

func (*EthBlockNumberIntegrityHandler) HandleResponse

type EthGetBlockByNumberIntegrityHandler

type EthGetBlockByNumberIntegrityHandler struct {
}

func NewEthGetBlockByNumberIntegrityHandler

func NewEthGetBlockByNumberIntegrityHandler() *EthGetBlockByNumberIntegrityHandler

func (*EthGetBlockByNumberIntegrityHandler) CanBeProcessed

func (*EthGetBlockByNumberIntegrityHandler) HandleResponse

type ExecutionFlow

type ExecutionFlow interface {
	Execute(ctx context.Context, requests []protocol.RequestHolder)
	GetResponses() chan *protocol.ResponseHolderWrapper
	AddHooks(hooks ...any)
}

type FailingStrategy

type FailingStrategy struct {
	// contains filtered or unexported fields
}

FailingStrategy is a sentinel strategy that returns the same preset error for every SelectUpstream call. Used by createStrategy to surface policy errors (e.g. quorum-not-supported) to the client without tying the check to a specific upstream selection path.

func NewFailingStrategy

func NewFailingStrategy(err error) *FailingStrategy

func (*FailingStrategy) SelectUpstream

func (f *FailingStrategy) SelectUpstream(_ protocol.RequestHolder) (string, error)

type FinalizedBlock

type FinalizedBlock struct {
	// contains filtered or unexported fields
}

func NewFinalizedBlock

func NewFinalizedBlock(number uint64) *FinalizedBlock

type HeadBlock

type HeadBlock struct {
	// contains filtered or unexported fields
}

func NewHeadBlock

func NewHeadBlock(number uint64) *HeadBlock

type IntegrityBlock

type IntegrityBlock interface {
	// contains filtered or unexported methods
}

type IntegrityHandler

type IntegrityHandler interface {
	CanBeProcessed(ctx context.Context, request protocol.RequestHolder) bool
	// HandleResponse - Processes a response from an upstream and determines if additional actions are required to support integrity. Returns:
	//
	// bool – indicates whether an additional request should be sent.
	//
	//[]string – if true, contains the list of upstream IDs where the additional request should be executed.
	//
	//IntegrityBlock – an object used to update chain state manually (e.g., replacing the tracked head block or finalization data) when necessary.
	HandleResponse(
		ctx context.Context,
		chainSupervisor upstreams.ChainSupervisor,
		request protocol.RequestHolder,
		currentResponse *protocol.ResponseHolderWrapper,
	) (bool, []string, IntegrityBlock)
}

type IntegrityRequestProcessor

type IntegrityRequestProcessor struct {
	// contains filtered or unexported fields
}

func NewIntegrityRequestProcessor

func NewIntegrityRequestProcessor(
	chain chains.Chain,
	upstreamSupervisor upstreams.UpstreamSupervisor,
	requestProcessor RequestProcessor,
) *IntegrityRequestProcessor

func (*IntegrityRequestProcessor) ProcessRequest

func (i *IntegrityRequestProcessor) ProcessRequest(
	ctx context.Context,
	upstreamStrategy UpstreamStrategy,
	request protocol.RequestHolder,
) ProcessedResponse

type LocalRequestProcessor

type LocalRequestProcessor struct {
	// contains filtered or unexported fields
}

func NewLocalRequestProcessor

func NewLocalRequestProcessor(chain chains.Chain, subCtx *SubCtx) *LocalRequestProcessor

func (*LocalRequestProcessor) ProcessRequest

type MatchResponse

type MatchResponse interface {
	Cause() string
	Type() MatchResponseType
}

type MatchResponseType

type MatchResponseType int
const (
	MethodType MatchResponseType = iota
	AvailabilityType
	RateLimiterType
	UpstreamIndexType
	SuccessType
)

type Matcher

type Matcher interface {
	Match(string, *protocol.UpstreamState) MatchResponse
}

type MethodBanHook

type MethodBanHook struct {
	// contains filtered or unexported fields
}

func NewMethodBanHook

func NewMethodBanHook(upstreamSupervisor upstreams.UpstreamSupervisor) *MethodBanHook

func (*MethodBanHook) OnResponseReceived

func (m *MethodBanHook) OnResponseReceived(_ context.Context, request protocol.RequestHolder, respWrapper *protocol.ResponseHolderWrapper)

type MethodMatcher

type MethodMatcher struct {
	// contains filtered or unexported fields
}

func NewMethodMatcher

func NewMethodMatcher(method string) *MethodMatcher

func (*MethodMatcher) Match

type MethodResponse

type MethodResponse struct {
	// contains filtered or unexported fields
}

func (MethodResponse) Cause

func (m MethodResponse) Cause() string

func (MethodResponse) Type

type MultiMatcher

type MultiMatcher struct {
	// contains filtered or unexported fields
}

func NewMultiMatcher

func NewMultiMatcher(matchers ...Matcher) *MultiMatcher

func (*MultiMatcher) Match

func (m *MultiMatcher) Match(upId string, state *protocol.UpstreamState) MatchResponse

type NoopIntegrityHandler

type NoopIntegrityHandler struct{}

func NewNoopIntegrityHandler

func NewNoopIntegrityHandler() *NoopIntegrityHandler

func (*NoopIntegrityHandler) CanBeProcessed

type ProcessedResponse

type ProcessedResponse interface {
	// contains filtered or unexported methods
}

type RateLimiterResponse

type RateLimiterResponse struct {
}

func (RateLimiterResponse) Cause

func (r RateLimiterResponse) Cause() string

func (RateLimiterResponse) Type

type RatingStrategy

type RatingStrategy struct {
	// contains filtered or unexported fields
}

func NewRatingStrategy

func NewRatingStrategy(
	chain chains.Chain,
	method string,
	additionalMatchers []Matcher,
	chainSupervisor upstreams.ChainSupervisor,
	registry *rating.RatingRegistry,
) *RatingStrategy

func (*RatingStrategy) SelectUpstream

func (r *RatingStrategy) SelectUpstream(request protocol.RequestHolder) (string, error)

type RequestProcessor

type RequestProcessor interface {
	ProcessRequest(ctx context.Context, upstreamStrategy UpstreamStrategy, request protocol.RequestHolder) ProcessedResponse
}

type SpecificOrderUpstreamStrategy

type SpecificOrderUpstreamStrategy struct {
	// contains filtered or unexported fields
}

func NewSpecificOrderUpstreamStrategy

func NewSpecificOrderUpstreamStrategy(upstreamIds []string, chainSupervisor upstreams.ChainSupervisor) *SpecificOrderUpstreamStrategy

func (*SpecificOrderUpstreamStrategy) SelectUpstream

func (s *SpecificOrderUpstreamStrategy) SelectUpstream(request protocol.RequestHolder) (string, error)

type StatusMatcher

type StatusMatcher struct{}

func NewStatusMatcher

func NewStatusMatcher() *StatusMatcher

func (*StatusMatcher) Match

type StickyRequestProcessor

type StickyRequestProcessor struct {
	// contains filtered or unexported fields
}

func NewStickyRequestProcessor

func NewStickyRequestProcessor(chain chains.Chain, upstreamSupervisor upstreams.UpstreamSupervisor) *StickyRequestProcessor

func (*StickyRequestProcessor) ProcessRequest

func (s *StickyRequestProcessor) ProcessRequest(
	ctx context.Context,
	upstreamStrategy UpstreamStrategy,
	request protocol.RequestHolder,
) ProcessedResponse

type SubCtx

type SubCtx struct {
	// contains filtered or unexported fields
}

func NewSubCtx

func NewSubCtx() *SubCtx

func (*SubCtx) AddSub

func (s *SubCtx) AddSub(sub string, cancel context.CancelFunc)

func (*SubCtx) Exists

func (s *SubCtx) Exists(sub string) bool

func (*SubCtx) IsSubscriptionResultOnly

func (s *SubCtx) IsSubscriptionResultOnly() bool

func (*SubCtx) Unsubscribe

func (s *SubCtx) Unsubscribe(sub string)

func (*SubCtx) WithSubscriptionResultOnly

func (s *SubCtx) WithSubscriptionResultOnly(enabled bool) *SubCtx

type SubscriptionRequestProcessor

type SubscriptionRequestProcessor struct {
	// contains filtered or unexported fields
}

func NewSubscriptionRequestProcessor

func NewSubscriptionRequestProcessor(upstreamSupervisor upstreams.UpstreamSupervisor, subCtx *SubCtx) *SubscriptionRequestProcessor

func (*SubscriptionRequestProcessor) ProcessRequest

func (s *SubscriptionRequestProcessor) ProcessRequest(
	ctx context.Context,
	upstreamStrategy UpstreamStrategy,
	request protocol.RequestHolder,
) ProcessedResponse

type SubscriptionResponse

type SubscriptionResponse struct {
	ResponseWrappers chan *protocol.ResponseHolderWrapper
}

type SuccessResponse

type SuccessResponse struct {
}

func (SuccessResponse) Cause

func (s SuccessResponse) Cause() string

func (SuccessResponse) Type

type UnaryRequestProcessor

type UnaryRequestProcessor struct {
	// contains filtered or unexported fields
}

func NewUnaryRequestProcessor

func NewUnaryRequestProcessor(chain chains.Chain, cacheProcessor caches.CacheProcessor, upstreamSupervisor upstreams.UpstreamSupervisor) *UnaryRequestProcessor

func (*UnaryRequestProcessor) ProcessRequest

func (u *UnaryRequestProcessor) ProcessRequest(
	ctx context.Context,
	upstreamStrategy UpstreamStrategy,
	request protocol.RequestHolder,
) ProcessedResponse

type UnaryResponse

type UnaryResponse struct {
	ResponseWrapper *protocol.ResponseHolderWrapper
}

type UpstreamIndexMatcher

type UpstreamIndexMatcher struct {
	// contains filtered or unexported fields
}

func NewUpstreamIndexMatcher

func NewUpstreamIndexMatcher(upstreamIndex string) *UpstreamIndexMatcher

func (*UpstreamIndexMatcher) Match

type UpstreamIndexResponse

type UpstreamIndexResponse struct {
	// contains filtered or unexported fields
}

func (UpstreamIndexResponse) Cause

func (u UpstreamIndexResponse) Cause() string

func (UpstreamIndexResponse) Type

type UpstreamStrategy

type UpstreamStrategy interface {
	SelectUpstream(request protocol.RequestHolder) (string, error)
}

type WsCapMatcher

type WsCapMatcher struct {
	// contains filtered or unexported fields
}

func NewWsCapMatcher

func NewWsCapMatcher(method string) *WsCapMatcher

func (*WsCapMatcher) Match

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL