service

package
v1.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: Apache-2.0 Imports: 93 Imported by: 8

Documentation

Index

Constants

View Source
const EnvEthCallFallbackToLatestDuration = "ETH_CALL_FALLBACK_TO_LATEST_DURATION"

EnvEthCallFallbackToLatestDuration is the environment variable name that, when set, enables a non-deterministic fallback to latest for Ethereum eth_call/eth_getBalance. Its presence requires callers to acknowledge non-determinism via the X-substreams-acknowledge-non-deterministic header.

View Source
const EnvEthCallUseBlockNumberDuration = "ETH_CALL_USE_BLOCK_NUMBER_DURATION"

EnvEthCallUseBlockNumberDuration is the environment variable name that, when set, enables using block number instead of block hash for Ethereum eth_call/eth_getBalance after a specified duration. This is to be used on chains with deterministic behavior but with an archive node that needs the block number to route old queries.

Variables

View Source
var ErrRequestActiveForTooLong = errors.New("request active for too long")
View Source
var IsValidCacheTag = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString
View Source
var ValidateRequest = ValidateTier1Request

Deprecated: use ValidateTier1Request

Functions

func ListenTier1 added in v1.1.6

func ListenTier1(
	listenAddresses []string,
	svc *Tier1Service,
	connectSvc *ConnectService,
	infoService pbsubstreamsrpc.EndpointInfoServer,
	infoServiceConnect pbsubstreamsrpcv2connect.EndpointInfoHandler,
	auth dauth.Authenticator,
	logger *zap.Logger,
	healthCheck dgrpcserver.HealthCheck,
	enforceCompression bool,
) (err error)

func ListenTier2 added in v1.1.6

func ListenTier2(
	addr string,
	serviceDiscoveryURL *url.URL,
	svc *Tier2Service,
	auth dauth.Authenticator,
	logger *zap.Logger,
	healthCheck dgrpcserver.HealthCheck,
	enforceCompression bool,
) (err error)

func RequestBackProcessing added in v1.8.0

func RequestBackProcessing(ctx context.Context, logger *zap.Logger, startBlock uint64, stageToProcess int, clientFactory client.InternalClientFactory, jobResult chan error)

func ValidateTier1Request added in v1.6.0

func ValidateTier1Request(request *pbsubstreamsrpc.Request, blockType string) error

ValidateTier1Request is run by the server code.

func ValidateTier2Request added in v1.6.0

func ValidateTier2Request(request *pbssinternal.ProcessRangeRequest) error

Types

type BlockFilter added in v1.6.0

type BlockFilter struct {
	// contains filtered or unexported fields
}

type ConnectService added in v1.18.0

type ConnectService struct {
	// contains filtered or unexported fields
}

func NewService added in v1.18.0

func NewService(inner *Tier1Service) *ConnectService

func (*ConnectService) Blocks added in v1.18.0

func (*ConnectService) BlocksV3 added in v1.18.0

func (*ConnectService) BlocksV4 added in v1.18.0

type ExecutionPlan added in v1.6.0

type ExecutionPlan struct {
	ExistingExecOuts map[string]execout.FileReader
	ExecoutWriters   map[string]*execout.Writer
	ExistingIndices  map[string]map[string]*roaring64.Bitmap
	IndexWriters     map[string]*index.Writer
	RequiredModules  map[string]*pbsubstreams.Module
	StoresToWrite    map[string]struct{}
	Skippable        bool
}

func GetExecutionPlan added in v1.6.0

func GetExecutionPlan(
	ctx context.Context,
	logger *zap.Logger,
	execGraph *exec.Graph,
	stage uint32,
	startBlock uint64,
	stopBlock uint64,
	outputModule string,
	execoutConfigs *execout.Configs,
	indexConfigs *index.Configs,
	storeConfigs store.ConfigMap,
) (plan *ExecutionPlan, err error)

type LiveBackFiller added in v1.8.0

type LiveBackFiller struct {
	RequestBackProcessing RequestBackProcessingFunc
	NextHandler           bstream.Handler
	// contains filtered or unexported fields
}

func NewLiveBackFiller added in v1.8.0

func NewLiveBackFiller(ctx context.Context, nextHandler bstream.Handler, logger *zap.Logger, stageToProcess int, segmentSize uint64, linearHandoff uint64, clientFactory client.InternalClientFactory, requestBackProcessing RequestBackProcessingFunc) *LiveBackFiller

func (*LiveBackFiller) CurrentSegment added in v1.17.9

func (l *LiveBackFiller) CurrentSegment() uint64

func (*LiveBackFiller) ProcessBlock added in v1.8.0

func (l *LiveBackFiller) ProcessBlock(blk *pbbstream.Block, obj interface{}) (err error)

func (*LiveBackFiller) Rewind added in v1.17.9

func (l *LiveBackFiller) Rewind(segment uint64)

func (*LiveBackFiller) Start added in v1.8.0

func (l *LiveBackFiller) Start(ctx context.Context)

type ModuleExecutionConfig added in v1.6.0

type ModuleExecutionConfig struct {
	// contains filtered or unexported fields
}

type NoopHandler added in v1.8.1

type NoopHandler struct {
	// contains filtered or unexported fields
}

func NewNoopHandler added in v1.8.1

func NewNoopHandler(respFunc substreams.ResponseFunc) *NoopHandler

func (NoopHandler) ProcessBlock added in v1.8.1

func (n NoopHandler) ProcessBlock(blk *pbbstream.Block, obj interface{}) (err error)

type Option

type Option func(anyTierService)

func WithBlockExecutionTimeout added in v1.10.0

func WithBlockExecutionTimeout(timeout time.Duration) Option

func WithFoundationalStoreEndpoints added in v1.16.6

func WithFoundationalStoreEndpoints(endpoints map[string]string) Option

func WithMaxConcurrentRequests added in v1.4.0

func WithMaxConcurrentRequests(max uint64) Option

func WithModuleExecutionTracing added in v1.1.4

func WithModuleExecutionTracing() Option

func WithReadinessFunc added in v1.4.0

func WithReadinessFunc(f func(bool)) Option

func WithSegmentExecutionTimeout added in v1.16.5

func WithSegmentExecutionTimeout(timeout time.Duration) Option

Tier2 will completely bail out if a segment execution takes longer than the this.

func WithWASMExtensioner added in v1.5.0

func WithWASMExtensioner(ext wasm.WASMExtensioner) Option

type RequestBackProcessingFunc added in v1.8.0

type RequestBackProcessingFunc = func(ctx context.Context, logger *zap.Logger, startBlock uint64, stageToProcess int, clientFactory client.InternalClientFactory, jobCompleted chan error)

type StreamFactory added in v0.0.21

type StreamFactory struct {
	// contains filtered or unexported fields
}

func (*StreamFactory) GetHeadBlock added in v1.0.2

func (s *StreamFactory) GetHeadBlock() (uint64, error)

func (*StreamFactory) GetRecentFinalBlock added in v0.1.0

func (s *StreamFactory) GetRecentFinalBlock() (uint64, error)

func (*StreamFactory) New added in v0.0.21

func (sf *StreamFactory) New(
	ctx context.Context,
	h bstream.Handler,
	startBlockNum int64,
	stopBlockNum uint64,
	cursor string,
	finalBlocksOnly bool,
	includePartialBlocks bool,
	cursorIsTarget bool,
	logger *zap.Logger,
	extraOpts ...stream.Option,
) (Streamable, error)

type StreamFactoryFunc added in v0.1.0

type StreamFactoryFunc func(ctx context.Context,
	h bstream.Handler,
	startBlockNum int64,
	stopBlockNum uint64,
	cursor string,
	finalBlocksOnly bool,
	includePartialBlocks bool,
	cursorIsTarget bool,
	logger *zap.Logger,
	extraOpts ...stream.Option) (Streamable, error)

type Streamable added in v0.1.0

type Streamable interface {
	Run(ctx context.Context) error
}

type Tier1Service added in v1.0.2

type Tier1Service struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewTier1 added in v1.0.2

func NewTier1(
	logger *zap.Logger,
	mergedBlocksStore dstore.Store,
	forkedBlocksStore dstore.Store,
	hub *hub.ForkableHub,

	stateStore dstore.Store,
	quickSaveStore dstore.Store,
	defaultCacheTag string,

	parallelSubRequests uint64,
	stateBundleSize uint64,
	blockType string,

	appSetIsReadyState func(isReady bool),
	substreamsClientConfig *client.SubstreamsClientConfig,
	tier2RequestParameters reqctx.Tier2RequestParameters,
	enforceCompression bool,
	activeRequestsSoftLimit int,
	activeRequestsHardLimit int,
	sharedCacheSize uint64,
	outputBufferSize uint64,
	sessionPool dsession.SessionPool,
	foundationalEndpoints map[string]string,
	opts ...Option,
) (*Tier1Service, error)

func TestNewService added in v0.1.0

func TestNewService(runtimeConfig config.RuntimeConfig, linearHandoffBlockNum uint64, streamFactoryFunc StreamFactoryFunc) *Tier1Service

func (*Tier1Service) Blocks added in v1.0.2

func (*Tier1Service) BlocksAny added in v1.17.0

func (s *Tier1Service) BlocksAny(
	ctx context.Context,
	request *pbsubstreamsrpc.Request,
	header http.Header,
	protocol string,
	pkg *pbsubstreams.Package,
	stream grpc.ServerStream,
	supportBuffering bool,
	logger *zap.Logger,
) (runningCtx context.Context, serverErr error, blockErr error)

func (*Tier1Service) BlocksAnyConnect added in v1.18.0

func (s *Tier1Service) BlocksAnyConnect(
	ctx context.Context,
	request *pbsubstreamsrpc.Request,
	header http.Header,
	protocol string,
	pkg *pbsubstreams.Package,
	stream grpc.ServerStream,
	supportBuffering bool,
) (serverErr error)

func (*Tier1Service) BlocksAnyGrpc added in v1.18.0

func (s *Tier1Service) BlocksAnyGrpc(
	ctx context.Context,
	request *pbsubstreamsrpc.Request,
	header http.Header,
	protocol string,
	pkg *pbsubstreams.Package,
	stream grpc.ServerStream,
	supportBuffering bool,
) (serverErr error)

func (*Tier1Service) BlocksV3 added in v1.17.0

func (*Tier1Service) BlocksV4 added in v1.18.0

func (*Tier1Service) TestBlocks added in v1.0.2

func (s *Tier1Service) TestBlocks(ctx context.Context, isSubRequest bool, request *pbsubstreamsrpc.Request, respFunc substreams.ResponseFunc) error

type Tier2Service added in v1.0.2

type Tier2Service struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewTier2 added in v1.0.2

func NewTier2(
	logger *zap.Logger,
	checkPendingShutdown func() bool,
	opts ...Option,
) (*Tier2Service, error)

func TestNewServiceTier2 added in v1.0.2

func TestNewServiceTier2(moduleExecutionTracing bool, streamFactoryFunc StreamFactoryFunc) *Tier2Service

func (*Tier2Service) ProcessRange added in v1.0.2

func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, streamSrv pbssinternal.Substreams_ProcessRangeServer) (serverErr error)

func (*Tier2Service) TestProcessRange added in v1.1.9

func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error

type UsedFoundationalStore added in v1.17.1

type UsedFoundationalStore struct {
	Identifier string
	ModuleHash string
}

func (*UsedFoundationalStore) MarshalLogObject added in v1.17.1

func (s *UsedFoundationalStore) MarshalLogObject(e zapcore.ObjectEncoder) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL