subscriber

package
v0.1.43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2026 License: GPL-3.0 Imports: 26 Imported by: 0

Documentation

Overview

Package subscriber: batch log filterer to merge multiple eth_getLogs into one JSON-RPC batch request.

Package subscriber: merge multiple FilterQueries into one so a single eth_getLogs (one JSON-RPC request) can be used instead of N, minimizing RPC cost per cycle.

Index

Constants

View Source
const DefaultConsecutiveZeroLogThreshold = 3

runRealtimeScanner runs one loop per chain: collect all realtime queries, one FilterLogsBatch, then dispatch. DefaultConsecutiveZeroLogThreshold is the number of consecutive scans returning 0 logs before we force-advance FromBlock. This prevents the scanner from getting stuck when there are genuinely no events for an extended period (which would otherwise cause the scan range to grow unboundedly and exceed RPC block range limits).

Variables

This section is empty.

Functions

func GetPartitionKey added in v0.1.38

func GetPartitionKey(q ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) string

GetPartitionKey returns a stable key for grouping FilterQueries that can be merged into one eth_getLogs. Same key ⇒ same block selector (range or blockHash) ⇒ can merge. For range queries, if FromBlock/ToBlock are nil, fallbackFrom/fallbackTo are used when non-nil; otherwise "nil" is used so nil range does not merge with a concrete range.

func GetQueryHash added in v0.0.2

func GetQueryHash(chainId *big.Int, query ethereum.FilterQuery) common.Hash

func GetQueryKey

func GetQueryKey(chainId *big.Int, query ethereum.FilterQuery) string

func LogMatchesQuery added in v0.1.38

func LogMatchesQuery(log *etypes.Log, q ethereum.FilterQuery) bool

LogMatchesQuery reports whether a log matches the given FilterQuery (address and topics).

func MergeFilterQueries added in v0.1.38

func MergeFilterQueries(queries []ethereum.FilterQuery, fromBlock, toBlock *big.Int) (ethereum.FilterQuery, error)

MergeFilterQueries merges multiple FilterQueries that share the same block range into one FilterQuery: union of Addresses and union of Topics per index. The result is suitable for a single eth_getLogs call; each returned log can then be matched to the original queries via LogMatchesQuery. fromBlock and toBlock are set on the merged query; BlockHash must be nil for all.

func MergeFilterQueriesByBlockHash added in v0.1.38

func MergeFilterQueriesByBlockHash(queries []ethereum.FilterQuery, blockHash common.Hash) (ethereum.FilterQuery, error)

MergeFilterQueriesByBlockHash merges multiple FilterQueries that share the same BlockHash into one FilterQuery (union of Addresses and Topics). Used for greedy minimization: all queries with the same BlockHash become one eth_getLogs.

func PartitionQueriesByMergeKey added in v0.1.38

func PartitionQueriesByMergeKey(queries []ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) [][]ethereum.FilterQuery

PartitionQueriesByMergeKey splits queries into groups that share the same partition key (same block range or same BlockHash). Each group can be merged into one eth_getLogs. fallbackFrom/fallbackTo are used for queries with nil FromBlock/ToBlock when provided.

func TopicLayoutKey added in v0.1.40

func TopicLayoutKey(q ethereum.FilterQuery) string

TopicLayoutKey returns a canonical key for the topic filter layout of q (which topic indices are set vs nil). Queries with the same layout can be merged without losing address filters: e.g. CT events use topic1=stakeholder, OrderFilled uses topic2=maker; merging them would set both topic1 and topic2 to nil (any query has nil → merged nil) and fetch full protocol events. Only merge queries that share the same layout so address filters are preserved. Uses up to 4 topic slots (standard for indexed params).

Types

type ChainSubscriber

type ChainSubscriber struct {
	// contains filtered or unexported fields
}

ChainSubscriber implements Subscriber interface

func NewChainSubscriber

func NewChainSubscriber(rpcCli *rpc.Client, storage SubscriberStorage) (*ChainSubscriber, error)

NewChainSubscriber .

func (*ChainSubscriber) Close added in v0.0.7

func (s *ChainSubscriber) Close()

func (*ChainSubscriber) FilterLogs

func (cs *ChainSubscriber) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)

func (*ChainSubscriber) FilterLogsWithChannel

func (cs *ChainSubscriber) FilterLogsWithChannel(ctx context.Context, q ethereum.FilterQuery, logsChan chan<- etypes.Log, watch bool, closeOnExit bool) (err error)

TODO: 3. cache all of finalized historical data, e.g., blockByHash, txByHash

func (*ChainSubscriber) GetBlockConfirmationsOnSubscription added in v0.0.2

func (cs *ChainSubscriber) GetBlockConfirmationsOnSubscription() uint64

func (*ChainSubscriber) GetQueryHandler added in v0.0.9

func (cs *ChainSubscriber) GetQueryHandler() QueryHandler

func (*ChainSubscriber) SetBlockConfirmationsOnSubscription added in v0.0.2

func (cs *ChainSubscriber) SetBlockConfirmationsOnSubscription(confirmations uint64)

func (*ChainSubscriber) SetBlocksPerScan added in v0.0.6

func (s *ChainSubscriber) SetBlocksPerScan(blocksPerScan uint64)

func (*ChainSubscriber) SetBuffer added in v0.0.10

func (cs *ChainSubscriber) SetBuffer(buffer int)

func (*ChainSubscriber) SetConsecutiveZeroLogThreshold added in v0.1.43

func (cs *ChainSubscriber) SetConsecutiveZeroLogThreshold(n int)

SetConsecutiveZeroLogThreshold sets how many consecutive 0-log scan cycles are allowed before forcing FromBlock to advance. Default is 3. This prevents the scanner from getting stuck when there are genuinely no events, which would cause the scan range to grow unboundedly and hit RPC block range limits.

func (*ChainSubscriber) SetFetchMissingHeaders added in v0.1.11

func (cs *ChainSubscriber) SetFetchMissingHeaders(enable bool)

func (*ChainSubscriber) SetMaxBlocksPerScan added in v0.1.4

func (s *ChainSubscriber) SetMaxBlocksPerScan(maxBlocksPerScan uint64)

func (*ChainSubscriber) SetMaxQueriesPerMerge added in v0.1.39

func (cs *ChainSubscriber) SetMaxQueriesPerMerge(n int)

SetMaxQueriesPerMerge sets the maximum number of queries to merge into one eth_getLogs. When > 0 and a partition has more queries, they are split into batches; each batch is requested separately and results are merged and sorted. Use 2 on BSC to avoid nodes returning 0 for large merged filters. 0 = no limit (default).

func (*ChainSubscriber) SetQueryHandler added in v0.0.7

func (cs *ChainSubscriber) SetQueryHandler(handler QueryHandler)

func (*ChainSubscriber) SetRetryInterval added in v0.0.6

func (s *ChainSubscriber) SetRetryInterval(retryInterval time.Duration)

func (*ChainSubscriber) SubmitQuery added in v0.0.7

func (cs *ChainSubscriber) SubmitQuery(query ethereum.FilterQuery) error

func (*ChainSubscriber) SubscribeFilterFullPendingTransactions added in v0.1.18

func (cs *ChainSubscriber) SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)

func (*ChainSubscriber) SubscribeFilterFullTransactions added in v0.1.18

func (cs *ChainSubscriber) SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)

func (*ChainSubscriber) SubscribeFilterLogs

func (cs *ChainSubscriber) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- etypes.Log) (sub ethereum.Subscription, err error)

func (*ChainSubscriber) SubscribeFullPendingTransactions added in v0.1.18

func (cs *ChainSubscriber) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)

SubscribeFullPendingTransactions subscribes to new pending transactions.

func (*ChainSubscriber) SubscribeNewHead

func (cs *ChainSubscriber) SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (sub ethereum.Subscription, err error)

SubscribeNewHead .

func (*ChainSubscriber) SubscribePendingTransactions added in v0.1.18

func (cs *ChainSubscriber) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)

SubscribePendingTransactions subscribes to new pending transaction hashes.

type FilterTransaction added in v0.1.18

type FilterTransaction struct {
	FromBlock *big.Int // beginning of the queried range, nil means genesis block. only used for historical data
	ToBlock   *big.Int // end of the range, nil means latest block. only used for historical data

	// Any of these conditions meet, the transaction is considered.
	From           []common.Address
	To             []common.Address
	MethodSelector []types.MethodSelector
}

type LogFiltererBatch added in v0.1.37

type LogFiltererBatch interface {
	FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]etypes.Log, error)
	FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) ([][]etypes.Log, error)
}

LogFiltererBatch extends log filtering with a batch call to reduce RPC round-trips. When multiple subscriptions share the same chain and block range, one FilterLogsBatch can replace N separate FilterLogs calls (e.g. one HTTP request via JSON-RPC batch).

func NewBatchLogFilterer added in v0.1.37

func NewBatchLogFilterer(c *ethclient.Client, rpcCli *rpc.Client) LogFiltererBatch

NewBatchLogFilterer returns a LogFiltererBatch that uses BatchCallContext for FilterLogsBatch.

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

func NewMemoryStorage

func NewMemoryStorage(chainId *big.Int) *MemoryStorage

func (*MemoryStorage) FilterLogs added in v0.0.11

func (s *MemoryStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)

func (*MemoryStorage) IsFilterLogsSupported added in v0.0.11

func (s *MemoryStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool

func (*MemoryStorage) LatestBlockForQuery

func (s *MemoryStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)

func (*MemoryStorage) LatestLogForQuery

func (s *MemoryStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)

func (*MemoryStorage) SaveFilterLogs added in v0.0.11

func (s *MemoryStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)

func (*MemoryStorage) SaveLatestBlockForQuery

func (s *MemoryStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error

func (*MemoryStorage) SaveLatestLogForQuery

func (s *MemoryStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, l types.Log) error

type Query added in v0.0.8

type Query struct {
	ChainID *big.Int
	ethereum.FilterQuery
}

func NewQuery added in v0.0.8

func NewQuery(chainId *big.Int, q ethereum.FilterQuery) Query

func (Query) Hash added in v0.0.8

func (q Query) Hash() common.Hash

type QueryHandler added in v0.0.7

type QueryHandler interface {
	// Please update query states by handler-self, otherwise
	// logs may be replayed
	SubscriberStorage
	// Subscriber will call back it for handling when incoming logs are ready.
	// If log.Address is address(0), just for updating block number
	HandleQuery(ctx context.Context, query Query, log etypes.Log) error
}

Used only for handler set && query.ToBlock == nil

type QueryStateReader added in v0.0.7

type QueryStateReader interface {
	LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
	LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (etypes.Log, error)

	// Save query result to save network io
	FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
	// Report whether client can use `FilterLogs` in the storage instead of ethclient.FilterLogs
	IsFilterLogsSupported(q ethereum.FilterQuery) bool
}

type QueryStateWriter added in v0.0.7

type QueryStateWriter interface {
	// Must call the function after all logs was handled for the block.
	SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
	// Must call the function after each log was handled .
	SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log etypes.Log) error

	// Save query result to save network io
	SaveFilterLogs(q ethereum.FilterQuery, logs []etypes.Log) (err error)
}

type RedisStorage

type RedisStorage struct {
	// contains filtered or unexported fields
}

func NewRedisStorage

func NewRedisStorage(chainId *big.Int, pool redis.Pool) *RedisStorage

func (*RedisStorage) FilterLogs added in v0.0.11

func (s *RedisStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)

func (*RedisStorage) IsFilterLogsSupported added in v0.0.11

func (s *RedisStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool

func (*RedisStorage) LatestBlockForQuery

func (s *RedisStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)

func (*RedisStorage) LatestLogForQuery

func (s *RedisStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)

func (*RedisStorage) QueryLock

func (s *RedisStorage) QueryLock(q ethereum.FilterQuery) sync.Locker

func (*RedisStorage) SaveFilterLogs added in v0.0.11

func (s *RedisStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)

func (*RedisStorage) SaveLatestBlockForQuery

func (s *RedisStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error

func (*RedisStorage) SaveLatestLogForQuery

func (s *RedisStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error

type Subscriber

type Subscriber interface {
	Close()
	GetQueryHandler() QueryHandler
	GetBlockConfirmationsOnSubscription() uint64

	SetBuffer(buffer int)
	SetBlockConfirmationsOnSubscription(confirmations uint64)
	SetQueryHandler(handler QueryHandler) // use QueryHandler instead of SubscriberStorage if handler set
	SetFetchMissingHeaders(enable bool)
	SetRetryInterval(retryInterval time.Duration)

	// Provided for handler submitting query.
	SubmitQuery(query ethereum.FilterQuery) error
	SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (ethereum.Subscription, error)
	SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
	// SubscribeFullPendingTransactions subscribes to new pending transactions.
	SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
	// SubscribePendingTransactions subscribes to new pending transaction hashes.
	SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
	SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
	SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- etypes.Log) (ethereum.Subscription, error)
	FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
}

Subscriber represents a set of methods about chain subscription

type SubscriberStorage

type SubscriberStorage interface {
	QueryStateReader
	QueryStateWriter
}

Used only for function `SubscribeFilterlogs` && query.ToBlock == nil

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL