Documentation
¶
Overview ¶
Package subscriber: batch log filterer to merge multiple eth_getLogs into one JSON-RPC batch request.
Package subscriber: merge multiple FilterQueries into one so a single eth_getLogs (one JSON-RPC request) can be used instead of N, minimizing RPC cost per cycle.
Index ¶
- Constants
- func GetPartitionKey(q ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) string
- func GetQueryHash(chainId *big.Int, query ethereum.FilterQuery) common.Hash
- func GetQueryKey(chainId *big.Int, query ethereum.FilterQuery) string
- func LogMatchesQuery(log *etypes.Log, q ethereum.FilterQuery) bool
- func MergeFilterQueries(queries []ethereum.FilterQuery, fromBlock, toBlock *big.Int) (ethereum.FilterQuery, error)
- func MergeFilterQueriesByBlockHash(queries []ethereum.FilterQuery, blockHash common.Hash) (ethereum.FilterQuery, error)
- func PartitionQueriesByMergeKey(queries []ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) [][]ethereum.FilterQuery
- func TopicLayoutKey(q ethereum.FilterQuery) string
- type ChainSubscriber
- func (s *ChainSubscriber) Close()
- func (cs *ChainSubscriber) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
- func (cs *ChainSubscriber) FilterLogsWithChannel(ctx context.Context, q ethereum.FilterQuery, logsChan chan<- etypes.Log, ...) (err error)
- func (cs *ChainSubscriber) GetBlockConfirmationsOnSubscription() uint64
- func (cs *ChainSubscriber) GetQueryHandler() QueryHandler
- func (cs *ChainSubscriber) SetBlockConfirmationsOnSubscription(confirmations uint64)
- func (s *ChainSubscriber) SetBlocksPerScan(blocksPerScan uint64)
- func (cs *ChainSubscriber) SetBuffer(buffer int)
- func (cs *ChainSubscriber) SetConsecutiveZeroLogThreshold(n int)
- func (cs *ChainSubscriber) SetFetchMissingHeaders(enable bool)
- func (s *ChainSubscriber) SetMaxBlocksPerScan(maxBlocksPerScan uint64)
- func (cs *ChainSubscriber) SetMaxQueriesPerMerge(n int)
- func (cs *ChainSubscriber) SetQueryHandler(handler QueryHandler)
- func (s *ChainSubscriber) SetRetryInterval(retryInterval time.Duration)
- func (cs *ChainSubscriber) SubmitQuery(query ethereum.FilterQuery) error
- func (cs *ChainSubscriber) SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
- func (cs *ChainSubscriber) SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
- func (cs *ChainSubscriber) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- etypes.Log) (sub ethereum.Subscription, err error)
- func (cs *ChainSubscriber) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
- func (cs *ChainSubscriber) SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (sub ethereum.Subscription, err error)
- func (cs *ChainSubscriber) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
- type FilterTransaction
- type LogFiltererBatch
- type MemoryStorage
- func (s *MemoryStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
- func (s *MemoryStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
- func (s *MemoryStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
- func (s *MemoryStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
- func (s *MemoryStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
- func (s *MemoryStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
- func (s *MemoryStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, l types.Log) error
- type Query
- type QueryHandler
- type QueryStateReader
- type QueryStateWriter
- type RedisStorage
- func (s *RedisStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
- func (s *RedisStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
- func (s *RedisStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
- func (s *RedisStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
- func (s *RedisStorage) QueryLock(q ethereum.FilterQuery) sync.Locker
- func (s *RedisStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
- func (s *RedisStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
- func (s *RedisStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error
- type Subscriber
- type SubscriberStorage
Constants ¶
const DefaultConsecutiveZeroLogThreshold = 3
runRealtimeScanner runs one loop per chain: collect all realtime queries, one FilterLogsBatch, then dispatch. DefaultConsecutiveZeroLogThreshold is the number of consecutive scans returning 0 logs before we force-advance FromBlock. This prevents the scanner from getting stuck when there are genuinely no events for an extended period (which would otherwise cause the scan range to grow unboundedly and exceed RPC block range limits).
Variables ¶
This section is empty.
Functions ¶
func GetPartitionKey ¶ added in v0.1.38
func GetPartitionKey(q ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) string
GetPartitionKey returns a stable key for grouping FilterQueries that can be merged into one eth_getLogs. Same key ⇒ same block selector (range or blockHash) ⇒ can merge. For range queries, if FromBlock/ToBlock are nil, fallbackFrom/fallbackTo are used when non-nil; otherwise "nil" is used so nil range does not merge with a concrete range.
func GetQueryHash ¶ added in v0.0.2
func GetQueryKey ¶
func GetQueryKey(chainId *big.Int, query ethereum.FilterQuery) string
func LogMatchesQuery ¶ added in v0.1.38
func LogMatchesQuery(log *etypes.Log, q ethereum.FilterQuery) bool
LogMatchesQuery reports whether a log matches the given FilterQuery (address and topics).
func MergeFilterQueries ¶ added in v0.1.38
func MergeFilterQueries(queries []ethereum.FilterQuery, fromBlock, toBlock *big.Int) (ethereum.FilterQuery, error)
MergeFilterQueries merges multiple FilterQueries that share the same block range into one FilterQuery: union of Addresses and union of Topics per index. The result is suitable for a single eth_getLogs call; each returned log can then be matched to the original queries via LogMatchesQuery. fromBlock and toBlock are set on the merged query; BlockHash must be nil for all.
func MergeFilterQueriesByBlockHash ¶ added in v0.1.38
func MergeFilterQueriesByBlockHash(queries []ethereum.FilterQuery, blockHash common.Hash) (ethereum.FilterQuery, error)
MergeFilterQueriesByBlockHash merges multiple FilterQueries that share the same BlockHash into one FilterQuery (union of Addresses and Topics). Used for greedy minimization: all queries with the same BlockHash become one eth_getLogs.
func PartitionQueriesByMergeKey ¶ added in v0.1.38
func PartitionQueriesByMergeKey(queries []ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) [][]ethereum.FilterQuery
PartitionQueriesByMergeKey splits queries into groups that share the same partition key (same block range or same BlockHash). Each group can be merged into one eth_getLogs. fallbackFrom/fallbackTo are used for queries with nil FromBlock/ToBlock when provided.
func TopicLayoutKey ¶ added in v0.1.40
func TopicLayoutKey(q ethereum.FilterQuery) string
TopicLayoutKey returns a canonical key for the topic filter layout of q (which topic indices are set vs nil). Queries with the same layout can be merged without losing address filters: e.g. CT events use topic1=stakeholder, OrderFilled uses topic2=maker; merging them would set both topic1 and topic2 to nil (any query has nil → merged nil) and fetch full protocol events. Only merge queries that share the same layout so address filters are preserved. Uses up to 4 topic slots (standard for indexed params).
Types ¶
type ChainSubscriber ¶
type ChainSubscriber struct {
// contains filtered or unexported fields
}
ChainSubscriber implements Subscriber interface
func NewChainSubscriber ¶
func NewChainSubscriber(rpcCli *rpc.Client, storage SubscriberStorage) (*ChainSubscriber, error)
NewChainSubscriber .
func (*ChainSubscriber) Close ¶ added in v0.0.7
func (s *ChainSubscriber) Close()
func (*ChainSubscriber) FilterLogs ¶
func (cs *ChainSubscriber) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
func (*ChainSubscriber) FilterLogsWithChannel ¶
func (cs *ChainSubscriber) FilterLogsWithChannel(ctx context.Context, q ethereum.FilterQuery, logsChan chan<- etypes.Log, watch bool, closeOnExit bool) (err error)
TODO: 3. cache all of finalized historical data, e.g., blockByHash, txByHash
func (*ChainSubscriber) GetBlockConfirmationsOnSubscription ¶ added in v0.0.2
func (cs *ChainSubscriber) GetBlockConfirmationsOnSubscription() uint64
func (*ChainSubscriber) GetQueryHandler ¶ added in v0.0.9
func (cs *ChainSubscriber) GetQueryHandler() QueryHandler
func (*ChainSubscriber) SetBlockConfirmationsOnSubscription ¶ added in v0.0.2
func (cs *ChainSubscriber) SetBlockConfirmationsOnSubscription(confirmations uint64)
func (*ChainSubscriber) SetBlocksPerScan ¶ added in v0.0.6
func (s *ChainSubscriber) SetBlocksPerScan(blocksPerScan uint64)
func (*ChainSubscriber) SetBuffer ¶ added in v0.0.10
func (cs *ChainSubscriber) SetBuffer(buffer int)
func (*ChainSubscriber) SetConsecutiveZeroLogThreshold ¶ added in v0.1.43
func (cs *ChainSubscriber) SetConsecutiveZeroLogThreshold(n int)
SetConsecutiveZeroLogThreshold sets how many consecutive 0-log scan cycles are allowed before forcing FromBlock to advance. Default is 3. This prevents the scanner from getting stuck when there are genuinely no events, which would cause the scan range to grow unboundedly and hit RPC block range limits.
func (*ChainSubscriber) SetFetchMissingHeaders ¶ added in v0.1.11
func (cs *ChainSubscriber) SetFetchMissingHeaders(enable bool)
func (*ChainSubscriber) SetMaxBlocksPerScan ¶ added in v0.1.4
func (s *ChainSubscriber) SetMaxBlocksPerScan(maxBlocksPerScan uint64)
func (*ChainSubscriber) SetMaxQueriesPerMerge ¶ added in v0.1.39
func (cs *ChainSubscriber) SetMaxQueriesPerMerge(n int)
SetMaxQueriesPerMerge sets the maximum number of queries to merge into one eth_getLogs. When > 0 and a partition has more queries, they are split into batches; each batch is requested separately and results are merged and sorted. Use 2 on BSC to avoid nodes returning 0 for large merged filters. 0 = no limit (default).
func (*ChainSubscriber) SetQueryHandler ¶ added in v0.0.7
func (cs *ChainSubscriber) SetQueryHandler(handler QueryHandler)
func (*ChainSubscriber) SetRetryInterval ¶ added in v0.0.6
func (s *ChainSubscriber) SetRetryInterval(retryInterval time.Duration)
func (*ChainSubscriber) SubmitQuery ¶ added in v0.0.7
func (cs *ChainSubscriber) SubmitQuery(query ethereum.FilterQuery) error
func (*ChainSubscriber) SubscribeFilterFullPendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
func (*ChainSubscriber) SubscribeFilterFullTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
func (*ChainSubscriber) SubscribeFilterLogs ¶
func (cs *ChainSubscriber) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- etypes.Log) (sub ethereum.Subscription, err error)
func (*ChainSubscriber) SubscribeFullPendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
SubscribeFullPendingTransactions subscribes to new pending transactions.
func (*ChainSubscriber) SubscribeNewHead ¶
func (cs *ChainSubscriber) SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (sub ethereum.Subscription, err error)
SubscribeNewHead .
func (*ChainSubscriber) SubscribePendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
SubscribePendingTransactions subscribes to new pending transaction hashes.
type FilterTransaction ¶ added in v0.1.18
type FilterTransaction struct {
FromBlock *big.Int // beginning of the queried range, nil means genesis block. only used for historical data
ToBlock *big.Int // end of the range, nil means latest block. only used for historical data
// Any of these conditions meet, the transaction is considered.
From []common.Address
To []common.Address
MethodSelector []types.MethodSelector
}
type LogFiltererBatch ¶ added in v0.1.37
type LogFiltererBatch interface {
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]etypes.Log, error)
FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) ([][]etypes.Log, error)
}
LogFiltererBatch extends log filtering with a batch call to reduce RPC round-trips. When multiple subscriptions share the same chain and block range, one FilterLogsBatch can replace N separate FilterLogs calls (e.g. one HTTP request via JSON-RPC batch).
func NewBatchLogFilterer ¶ added in v0.1.37
func NewBatchLogFilterer(c *ethclient.Client, rpcCli *rpc.Client) LogFiltererBatch
NewBatchLogFilterer returns a LogFiltererBatch that uses BatchCallContext for FilterLogsBatch.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
func NewMemoryStorage ¶
func NewMemoryStorage(chainId *big.Int) *MemoryStorage
func (*MemoryStorage) FilterLogs ¶ added in v0.0.11
func (s *MemoryStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
func (*MemoryStorage) IsFilterLogsSupported ¶ added in v0.0.11
func (s *MemoryStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
func (*MemoryStorage) LatestBlockForQuery ¶
func (s *MemoryStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
func (*MemoryStorage) LatestLogForQuery ¶
func (s *MemoryStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
func (*MemoryStorage) SaveFilterLogs ¶ added in v0.0.11
func (s *MemoryStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
func (*MemoryStorage) SaveLatestBlockForQuery ¶
func (s *MemoryStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
func (*MemoryStorage) SaveLatestLogForQuery ¶
func (s *MemoryStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, l types.Log) error
type QueryHandler ¶ added in v0.0.7
type QueryHandler interface {
// Please update query states by handler-self, otherwise
// logs may be replayed
SubscriberStorage
// Subscriber will call back it for handling when incoming logs are ready.
// If log.Address is address(0), just for updating block number
HandleQuery(ctx context.Context, query Query, log etypes.Log) error
}
Used only for handler set && query.ToBlock == nil
type QueryStateReader ¶ added in v0.0.7
type QueryStateReader interface {
LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (etypes.Log, error)
// Save query result to save network io
FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
// Report whether client can use `FilterLogs` in the storage instead of ethclient.FilterLogs
IsFilterLogsSupported(q ethereum.FilterQuery) bool
}
type QueryStateWriter ¶ added in v0.0.7
type QueryStateWriter interface {
// Must call the function after all logs was handled for the block.
SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
// Must call the function after each log was handled .
SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log etypes.Log) error
// Save query result to save network io
SaveFilterLogs(q ethereum.FilterQuery, logs []etypes.Log) (err error)
}
type RedisStorage ¶
type RedisStorage struct {
// contains filtered or unexported fields
}
func NewRedisStorage ¶
func NewRedisStorage(chainId *big.Int, pool redis.Pool) *RedisStorage
func (*RedisStorage) FilterLogs ¶ added in v0.0.11
func (s *RedisStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
func (*RedisStorage) IsFilterLogsSupported ¶ added in v0.0.11
func (s *RedisStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
func (*RedisStorage) LatestBlockForQuery ¶
func (s *RedisStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
func (*RedisStorage) LatestLogForQuery ¶
func (s *RedisStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
func (*RedisStorage) QueryLock ¶
func (s *RedisStorage) QueryLock(q ethereum.FilterQuery) sync.Locker
func (*RedisStorage) SaveFilterLogs ¶ added in v0.0.11
func (s *RedisStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
func (*RedisStorage) SaveLatestBlockForQuery ¶
func (s *RedisStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
func (*RedisStorage) SaveLatestLogForQuery ¶
func (s *RedisStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error
type Subscriber ¶
type Subscriber interface {
Close()
GetQueryHandler() QueryHandler
GetBlockConfirmationsOnSubscription() uint64
SetBuffer(buffer int)
SetBlockConfirmationsOnSubscription(confirmations uint64)
SetQueryHandler(handler QueryHandler) // use QueryHandler instead of SubscriberStorage if handler set
SetFetchMissingHeaders(enable bool)
SetRetryInterval(retryInterval time.Duration)
// Provided for handler submitting query.
SubmitQuery(query ethereum.FilterQuery) error
SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (ethereum.Subscription, error)
SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
// SubscribeFullPendingTransactions subscribes to new pending transactions.
SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
// SubscribePendingTransactions subscribes to new pending transaction hashes.
SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- etypes.Log) (ethereum.Subscription, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
}
Subscriber represents a set of methods about chain subscription
type SubscriberStorage ¶
type SubscriberStorage interface {
QueryStateReader
QueryStateWriter
}
Used only for function `SubscribeFilterlogs` && query.ToBlock == nil