Documentation
¶
Overview ¶
Package subscriber: batch log filterer to merge multiple eth_getLogs into one JSON-RPC batch request.
Package subscriber: merge multiple FilterQueries into one so a single eth_getLogs (one JSON-RPC request) can be used instead of N, minimizing RPC cost per cycle.
Index ¶
- func GetPartitionKey(q ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) string
- func GetQueryHash(chainId *big.Int, query ethereum.FilterQuery) common.Hash
- func GetQueryKey(chainId *big.Int, query ethereum.FilterQuery) string
- func LogMatchesQuery(log *etypes.Log, q ethereum.FilterQuery) bool
- func MergeFilterQueries(queries []ethereum.FilterQuery, fromBlock, toBlock *big.Int) (ethereum.FilterQuery, error)
- func MergeFilterQueriesByBlockHash(queries []ethereum.FilterQuery, blockHash common.Hash) (ethereum.FilterQuery, error)
- func PartitionQueriesByMergeKey(queries []ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) [][]ethereum.FilterQuery
- type ChainSubscriber
- func (s *ChainSubscriber) Close()
- func (cs *ChainSubscriber) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
- func (cs *ChainSubscriber) FilterLogsWithChannel(ctx context.Context, q ethereum.FilterQuery, logsChan chan<- etypes.Log, ...) (err error)
- func (cs *ChainSubscriber) GetBlockConfirmationsOnSubscription() uint64
- func (cs *ChainSubscriber) GetQueryHandler() QueryHandler
- func (cs *ChainSubscriber) SetBlockConfirmationsOnSubscription(confirmations uint64)
- func (s *ChainSubscriber) SetBlocksPerScan(blocksPerScan uint64)
- func (cs *ChainSubscriber) SetBuffer(buffer int)
- func (cs *ChainSubscriber) SetFetchMissingHeaders(enable bool)
- func (s *ChainSubscriber) SetMaxBlocksPerScan(maxBlocksPerScan uint64)
- func (cs *ChainSubscriber) SetQueryHandler(handler QueryHandler)
- func (s *ChainSubscriber) SetRetryInterval(retryInterval time.Duration)
- func (cs *ChainSubscriber) SubmitQuery(query ethereum.FilterQuery) error
- func (cs *ChainSubscriber) SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
- func (cs *ChainSubscriber) SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
- func (cs *ChainSubscriber) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- etypes.Log) (sub ethereum.Subscription, err error)
- func (cs *ChainSubscriber) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
- func (cs *ChainSubscriber) SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (sub ethereum.Subscription, err error)
- func (cs *ChainSubscriber) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
- type FilterTransaction
- type LogFiltererBatch
- type MemoryStorage
- func (s *MemoryStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
- func (s *MemoryStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
- func (s *MemoryStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
- func (s *MemoryStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
- func (s *MemoryStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
- func (s *MemoryStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
- func (s *MemoryStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, l types.Log) error
- type Query
- type QueryHandler
- type QueryStateReader
- type QueryStateWriter
- type RedisStorage
- func (s *RedisStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
- func (s *RedisStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
- func (s *RedisStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
- func (s *RedisStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
- func (s *RedisStorage) QueryLock(q ethereum.FilterQuery) sync.Locker
- func (s *RedisStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
- func (s *RedisStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
- func (s *RedisStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error
- type Subscriber
- type SubscriberStorage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetPartitionKey ¶ added in v0.1.38
func GetPartitionKey(q ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) string
GetPartitionKey returns a stable key for grouping FilterQueries that can be merged into one eth_getLogs. Same key ⇒ same block selector (range or blockHash) ⇒ can merge. For range queries, if FromBlock/ToBlock are nil, fallbackFrom/fallbackTo are used when non-nil; otherwise "nil" is used so nil range does not merge with a concrete range.
func GetQueryHash ¶ added in v0.0.2
func GetQueryKey ¶
func GetQueryKey(chainId *big.Int, query ethereum.FilterQuery) string
func LogMatchesQuery ¶ added in v0.1.38
func LogMatchesQuery(log *etypes.Log, q ethereum.FilterQuery) bool
LogMatchesQuery reports whether a log matches the given FilterQuery (address and topics).
func MergeFilterQueries ¶ added in v0.1.38
func MergeFilterQueries(queries []ethereum.FilterQuery, fromBlock, toBlock *big.Int) (ethereum.FilterQuery, error)
MergeFilterQueries merges multiple FilterQueries that share the same block range into one FilterQuery: union of Addresses and union of Topics per index. The result is suitable for a single eth_getLogs call; each returned log can then be matched to the original queries via LogMatchesQuery. fromBlock and toBlock are set on the merged query; BlockHash must be nil for all.
func MergeFilterQueriesByBlockHash ¶ added in v0.1.38
func MergeFilterQueriesByBlockHash(queries []ethereum.FilterQuery, blockHash common.Hash) (ethereum.FilterQuery, error)
MergeFilterQueriesByBlockHash merges multiple FilterQueries that share the same BlockHash into one FilterQuery (union of Addresses and Topics). Used for greedy minimization: all queries with the same BlockHash become one eth_getLogs.
func PartitionQueriesByMergeKey ¶ added in v0.1.38
func PartitionQueriesByMergeKey(queries []ethereum.FilterQuery, fallbackFrom, fallbackTo *big.Int) [][]ethereum.FilterQuery
PartitionQueriesByMergeKey splits queries into groups that share the same partition key (same block range or same BlockHash). Each group can be merged into one eth_getLogs. fallbackFrom/fallbackTo are used for queries with nil FromBlock/ToBlock when provided.
Types ¶
type ChainSubscriber ¶
type ChainSubscriber struct {
// contains filtered or unexported fields
}
ChainSubscriber implements Subscriber interface
func NewChainSubscriber ¶
func NewChainSubscriber(rpcCli *rpc.Client, storage SubscriberStorage) (*ChainSubscriber, error)
NewChainSubscriber .
func (*ChainSubscriber) Close ¶ added in v0.0.7
func (s *ChainSubscriber) Close()
func (*ChainSubscriber) FilterLogs ¶
func (cs *ChainSubscriber) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
func (*ChainSubscriber) FilterLogsWithChannel ¶
func (cs *ChainSubscriber) FilterLogsWithChannel(ctx context.Context, q ethereum.FilterQuery, logsChan chan<- etypes.Log, watch bool, closeOnExit bool) (err error)
TODO: 3. cache all of finalized historical data, e.g., blockByHash, txByHash
func (*ChainSubscriber) GetBlockConfirmationsOnSubscription ¶ added in v0.0.2
func (cs *ChainSubscriber) GetBlockConfirmationsOnSubscription() uint64
func (*ChainSubscriber) GetQueryHandler ¶ added in v0.0.9
func (cs *ChainSubscriber) GetQueryHandler() QueryHandler
func (*ChainSubscriber) SetBlockConfirmationsOnSubscription ¶ added in v0.0.2
func (cs *ChainSubscriber) SetBlockConfirmationsOnSubscription(confirmations uint64)
func (*ChainSubscriber) SetBlocksPerScan ¶ added in v0.0.6
func (s *ChainSubscriber) SetBlocksPerScan(blocksPerScan uint64)
func (*ChainSubscriber) SetBuffer ¶ added in v0.0.10
func (cs *ChainSubscriber) SetBuffer(buffer int)
func (*ChainSubscriber) SetFetchMissingHeaders ¶ added in v0.1.11
func (cs *ChainSubscriber) SetFetchMissingHeaders(enable bool)
func (*ChainSubscriber) SetMaxBlocksPerScan ¶ added in v0.1.4
func (s *ChainSubscriber) SetMaxBlocksPerScan(maxBlocksPerScan uint64)
func (*ChainSubscriber) SetQueryHandler ¶ added in v0.0.7
func (cs *ChainSubscriber) SetQueryHandler(handler QueryHandler)
func (*ChainSubscriber) SetRetryInterval ¶ added in v0.0.6
func (s *ChainSubscriber) SetRetryInterval(retryInterval time.Duration)
func (*ChainSubscriber) SubmitQuery ¶ added in v0.0.7
func (cs *ChainSubscriber) SubmitQuery(query ethereum.FilterQuery) error
func (*ChainSubscriber) SubscribeFilterFullPendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
func (*ChainSubscriber) SubscribeFilterFullTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
func (*ChainSubscriber) SubscribeFilterLogs ¶
func (cs *ChainSubscriber) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- etypes.Log) (sub ethereum.Subscription, err error)
func (*ChainSubscriber) SubscribeFullPendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
SubscribeFullPendingTransactions subscribes to new pending transactions.
func (*ChainSubscriber) SubscribeNewHead ¶
func (cs *ChainSubscriber) SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (sub ethereum.Subscription, err error)
SubscribeNewHead .
func (*ChainSubscriber) SubscribePendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
SubscribePendingTransactions subscribes to new pending transaction hashes.
type FilterTransaction ¶ added in v0.1.18
type FilterTransaction struct {
FromBlock *big.Int // beginning of the queried range, nil means genesis block. only used for historical data
ToBlock *big.Int // end of the range, nil means latest block. only used for historical data
// Any of these conditions meet, the transaction is considered.
From []common.Address
To []common.Address
MethodSelector []types.MethodSelector
}
type LogFiltererBatch ¶ added in v0.1.37
type LogFiltererBatch interface {
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]etypes.Log, error)
FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) ([][]etypes.Log, error)
}
LogFiltererBatch extends log filtering with a batch call to reduce RPC round-trips. When multiple subscriptions share the same chain and block range, one FilterLogsBatch can replace N separate FilterLogs calls (e.g. one HTTP request via JSON-RPC batch).
func NewBatchLogFilterer ¶ added in v0.1.37
func NewBatchLogFilterer(c *ethclient.Client, rpcCli *rpc.Client) LogFiltererBatch
NewBatchLogFilterer returns a LogFiltererBatch that uses BatchCallContext for FilterLogsBatch.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
func NewMemoryStorage ¶
func NewMemoryStorage(chainId *big.Int) *MemoryStorage
func (*MemoryStorage) FilterLogs ¶ added in v0.0.11
func (s *MemoryStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
func (*MemoryStorage) IsFilterLogsSupported ¶ added in v0.0.11
func (s *MemoryStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
func (*MemoryStorage) LatestBlockForQuery ¶
func (s *MemoryStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
func (*MemoryStorage) LatestLogForQuery ¶
func (s *MemoryStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
func (*MemoryStorage) SaveFilterLogs ¶ added in v0.0.11
func (s *MemoryStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
func (*MemoryStorage) SaveLatestBlockForQuery ¶
func (s *MemoryStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
func (*MemoryStorage) SaveLatestLogForQuery ¶
func (s *MemoryStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, l types.Log) error
type QueryHandler ¶ added in v0.0.7
type QueryHandler interface {
// Please update query states by handler-self, otherwise
// logs may be replayed
SubscriberStorage
// Subscriber will call back it for handling when incoming logs are ready.
// If log.Address is address(0), just for updating block number
HandleQuery(ctx context.Context, query Query, log etypes.Log) error
}
Used only for handler set && query.ToBlock == nil
type QueryStateReader ¶ added in v0.0.7
type QueryStateReader interface {
LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (etypes.Log, error)
// Save query result to save network io
FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
// Report whether client can use `FilterLogs` in the storage instead of ethclient.FilterLogs
IsFilterLogsSupported(q ethereum.FilterQuery) bool
}
type QueryStateWriter ¶ added in v0.0.7
type QueryStateWriter interface {
// Must call the function after all logs was handled for the block.
SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
// Must call the function after each log was handled .
SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log etypes.Log) error
// Save query result to save network io
SaveFilterLogs(q ethereum.FilterQuery, logs []etypes.Log) (err error)
}
type RedisStorage ¶
type RedisStorage struct {
// contains filtered or unexported fields
}
func NewRedisStorage ¶
func NewRedisStorage(chainId *big.Int, pool redis.Pool) *RedisStorage
func (*RedisStorage) FilterLogs ¶ added in v0.0.11
func (s *RedisStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
func (*RedisStorage) IsFilterLogsSupported ¶ added in v0.0.11
func (s *RedisStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
func (*RedisStorage) LatestBlockForQuery ¶
func (s *RedisStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
func (*RedisStorage) LatestLogForQuery ¶
func (s *RedisStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
func (*RedisStorage) QueryLock ¶
func (s *RedisStorage) QueryLock(q ethereum.FilterQuery) sync.Locker
func (*RedisStorage) SaveFilterLogs ¶ added in v0.0.11
func (s *RedisStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
func (*RedisStorage) SaveLatestBlockForQuery ¶
func (s *RedisStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
func (*RedisStorage) SaveLatestLogForQuery ¶
func (s *RedisStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error
type Subscriber ¶
type Subscriber interface {
Close()
GetQueryHandler() QueryHandler
GetBlockConfirmationsOnSubscription() uint64
SetBuffer(buffer int)
SetBlockConfirmationsOnSubscription(confirmations uint64)
SetQueryHandler(handler QueryHandler) // use QueryHandler instead of SubscriberStorage if handler set
SetFetchMissingHeaders(enable bool)
// Provided for handler submitting query.
SubmitQuery(query ethereum.FilterQuery) error
SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (ethereum.Subscription, error)
SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
// SubscribeFullPendingTransactions subscribes to new pending transactions.
SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
// SubscribePendingTransactions subscribes to new pending transaction hashes.
SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- etypes.Log) (ethereum.Subscription, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
}
Subscriber represents a set of methods about chain subscription
type SubscriberStorage ¶
type SubscriberStorage interface {
QueryStateReader
QueryStateWriter
}
Used only for function `SubscribeFilterlogs` && query.ToBlock == nil