Documentation
¶
Index ¶
- func GetQueryHash(chainId *big.Int, query ethereum.FilterQuery) common.Hash
- func GetQueryKey(chainId *big.Int, query ethereum.FilterQuery) string
- type ChainSubscriber
- func (s *ChainSubscriber) Close()
- func (cs *ChainSubscriber) DeleteQuery(query ethereum.FilterQuery) error
- func (cs *ChainSubscriber) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
- func (cs *ChainSubscriber) FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) (logs [][]etypes.Log, err error)
- func (cs *ChainSubscriber) FilterLogsWithChannel(ctx context.Context, q ethereum.FilterQuery, logsChan chan<- etypes.Log, ...) (err error)
- func (cs *ChainSubscriber) GetBlockConfirmationsOnSubscription() uint64
- func (cs *ChainSubscriber) GetQueryHandler() QueryHandler
- func (cs *ChainSubscriber) SetBlockConfirmationsOnSubscription(confirmations uint64)
- func (s *ChainSubscriber) SetBlocksPerScan(blocksPerScan uint64)
- func (cs *ChainSubscriber) SetBuffer(buffer int)
- func (cs *ChainSubscriber) SetFetchMissingHeaders(enable bool)
- func (s *ChainSubscriber) SetMaxBlocksPerScan(maxBlocksPerScan uint64)
- func (cs *ChainSubscriber) SetQueryHandler(handler QueryHandler)
- func (s *ChainSubscriber) SetRetryInterval(retryInterval time.Duration)
- func (cs *ChainSubscriber) SubmitQuery(query ethereum.FilterQuery) error
- func (cs *ChainSubscriber) SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
- func (cs *ChainSubscriber) SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
- func (cs *ChainSubscriber) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- etypes.Log) (sub ethereum.Subscription, err error)
- func (cs *ChainSubscriber) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
- func (cs *ChainSubscriber) SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (sub ethereum.Subscription, err error)
- func (cs *ChainSubscriber) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
- type FilterLogsFunc
- type FilterTransaction
- type MemoryStorage
- func (s *MemoryStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
- func (s *MemoryStorage) FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) (logs [][]types.Log, err error)
- func (s *MemoryStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
- func (s *MemoryStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
- func (s *MemoryStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
- func (s *MemoryStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
- func (s *MemoryStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
- func (s *MemoryStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, l types.Log) error
- type Query
- type QueryHandler
- type QueryStateReader
- type QueryStateWriter
- type QueryWithChannel
- type RedisStorage
- func (s *RedisStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
- func (s *RedisStorage) FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) (logs [][]types.Log, err error)
- func (s *RedisStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
- func (s *RedisStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
- func (s *RedisStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
- func (s *RedisStorage) QueryLock(q ethereum.FilterQuery) sync.Locker
- func (s *RedisStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
- func (s *RedisStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
- func (s *RedisStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error
- type Subscriber
- type SubscriberStorage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetQueryHash ¶ added in v0.0.2
func GetQueryKey ¶
func GetQueryKey(chainId *big.Int, query ethereum.FilterQuery) string
Types ¶
type ChainSubscriber ¶
type ChainSubscriber struct {
// contains filtered or unexported fields
}
ChainSubscriber implements Subscriber interface
func NewChainSubscriber ¶
func NewChainSubscriber(rpcCli *rpc.Client, storage SubscriberStorage) (*ChainSubscriber, error)
NewChainSubscriber .
func (*ChainSubscriber) Close ¶ added in v0.0.7
func (s *ChainSubscriber) Close()
func (*ChainSubscriber) DeleteQuery ¶ added in v0.1.27
func (cs *ChainSubscriber) DeleteQuery(query ethereum.FilterQuery) error
func (*ChainSubscriber) FilterLogs ¶
func (cs *ChainSubscriber) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
func (*ChainSubscriber) FilterLogsBatch ¶ added in v0.1.27
func (cs *ChainSubscriber) FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) (logs [][]etypes.Log, err error)
func (*ChainSubscriber) FilterLogsWithChannel ¶
func (cs *ChainSubscriber) FilterLogsWithChannel(ctx context.Context, q ethereum.FilterQuery, logsChan chan<- etypes.Log, watch bool, closeOnExit bool) (err error)
TODO: 3. cache all of finalized historical data, e.g., blockByHash, txByHash
func (*ChainSubscriber) GetBlockConfirmationsOnSubscription ¶ added in v0.0.2
func (cs *ChainSubscriber) GetBlockConfirmationsOnSubscription() uint64
func (*ChainSubscriber) GetQueryHandler ¶ added in v0.0.9
func (cs *ChainSubscriber) GetQueryHandler() QueryHandler
func (*ChainSubscriber) SetBlockConfirmationsOnSubscription ¶ added in v0.0.2
func (cs *ChainSubscriber) SetBlockConfirmationsOnSubscription(confirmations uint64)
func (*ChainSubscriber) SetBlocksPerScan ¶ added in v0.0.6
func (s *ChainSubscriber) SetBlocksPerScan(blocksPerScan uint64)
func (*ChainSubscriber) SetBuffer ¶ added in v0.0.10
func (cs *ChainSubscriber) SetBuffer(buffer int)
func (*ChainSubscriber) SetFetchMissingHeaders ¶ added in v0.1.11
func (cs *ChainSubscriber) SetFetchMissingHeaders(enable bool)
func (*ChainSubscriber) SetMaxBlocksPerScan ¶ added in v0.1.4
func (s *ChainSubscriber) SetMaxBlocksPerScan(maxBlocksPerScan uint64)
func (*ChainSubscriber) SetQueryHandler ¶ added in v0.0.7
func (cs *ChainSubscriber) SetQueryHandler(handler QueryHandler)
func (*ChainSubscriber) SetRetryInterval ¶ added in v0.0.6
func (s *ChainSubscriber) SetRetryInterval(retryInterval time.Duration)
func (*ChainSubscriber) SubmitQuery ¶ added in v0.0.7
func (cs *ChainSubscriber) SubmitQuery(query ethereum.FilterQuery) error
func (*ChainSubscriber) SubscribeFilterFullPendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
func (*ChainSubscriber) SubscribeFilterFullTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
func (*ChainSubscriber) SubscribeFilterLogs ¶
func (cs *ChainSubscriber) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- etypes.Log) (sub ethereum.Subscription, err error)
func (*ChainSubscriber) SubscribeFullPendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
SubscribeFullPendingTransactions subscribes to new pending transactions.
func (*ChainSubscriber) SubscribeNewHead ¶
func (cs *ChainSubscriber) SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (sub ethereum.Subscription, err error)
SubscribeNewHead .
func (*ChainSubscriber) SubscribePendingTransactions ¶ added in v0.1.18
func (cs *ChainSubscriber) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
SubscribePendingTransactions subscribes to new pending transaction hashes.
type FilterLogsFunc ¶ added in v0.1.27
type FilterTransaction ¶ added in v0.1.18
type FilterTransaction struct {
FromBlock *big.Int // beginning of the queried range, nil means genesis block. only used for historical data
ToBlock *big.Int // end of the range, nil means latest block. only used for historical data
// Any of these conditions meet, the transaction is considered.
From []common.Address
To []common.Address
MethodSelector []types.MethodSelector
}
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
func NewMemoryStorage ¶
func NewMemoryStorage(chainId *big.Int) *MemoryStorage
func (*MemoryStorage) FilterLogs ¶ added in v0.0.11
func (s *MemoryStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
func (*MemoryStorage) FilterLogsBatch ¶ added in v0.1.27
func (s *MemoryStorage) FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) (logs [][]types.Log, err error)
func (*MemoryStorage) IsFilterLogsSupported ¶ added in v0.0.11
func (s *MemoryStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
func (*MemoryStorage) LatestBlockForQuery ¶
func (s *MemoryStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
func (*MemoryStorage) LatestLogForQuery ¶
func (s *MemoryStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
func (*MemoryStorage) SaveFilterLogs ¶ added in v0.0.11
func (s *MemoryStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
func (*MemoryStorage) SaveLatestBlockForQuery ¶
func (s *MemoryStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
func (*MemoryStorage) SaveLatestLogForQuery ¶
func (s *MemoryStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, l types.Log) error
type QueryHandler ¶ added in v0.0.7
type QueryHandler interface {
// Please update query states by handler-self, otherwise
// logs may be replayed
SubscriberStorage
// Subscriber will call back it for handling when incoming logs are ready.
// If log.Address is address(0), just for updating block number
HandleQuery(ctx context.Context, query Query, log etypes.Log) error
}
Used only for handler set && query.ToBlock == nil
type QueryStateReader ¶ added in v0.0.7
type QueryStateReader interface {
LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (etypes.Log, error)
// Save query result to save network io
FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
// Report whether client can use `FilterLogs` in the storage instead of ethclient.FilterLogs
IsFilterLogsSupported(q ethereum.FilterQuery) bool
FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) (logs [][]etypes.Log, err error)
}
type QueryStateWriter ¶ added in v0.0.7
type QueryStateWriter interface {
// Must call the function after all logs was handled for the block.
SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
// Must call the function after each log was handled .
SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log etypes.Log) error
// Save query result to save network io
SaveFilterLogs(q ethereum.FilterQuery, logs []etypes.Log) (err error)
}
type QueryWithChannel ¶ added in v0.1.27
type QueryWithChannel struct {
Query
// contains filtered or unexported fields
}
type RedisStorage ¶
type RedisStorage struct {
// contains filtered or unexported fields
}
func NewRedisStorage ¶
func NewRedisStorage(chainId *big.Int, pool redis.Pool) *RedisStorage
func (*RedisStorage) FilterLogs ¶ added in v0.0.11
func (s *RedisStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
func (*RedisStorage) FilterLogsBatch ¶ added in v0.1.27
func (s *RedisStorage) FilterLogsBatch(ctx context.Context, queries []ethereum.FilterQuery) (logs [][]types.Log, err error)
func (*RedisStorage) IsFilterLogsSupported ¶ added in v0.0.11
func (s *RedisStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool
func (*RedisStorage) LatestBlockForQuery ¶
func (s *RedisStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
func (*RedisStorage) LatestLogForQuery ¶
func (s *RedisStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)
func (*RedisStorage) QueryLock ¶
func (s *RedisStorage) QueryLock(q ethereum.FilterQuery) sync.Locker
func (*RedisStorage) SaveFilterLogs ¶ added in v0.0.11
func (s *RedisStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
func (*RedisStorage) SaveLatestBlockForQuery ¶
func (s *RedisStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
func (*RedisStorage) SaveLatestLogForQuery ¶
func (s *RedisStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error
type Subscriber ¶
type Subscriber interface {
Close()
GetQueryHandler() QueryHandler
GetBlockConfirmationsOnSubscription() uint64
SetBuffer(buffer int)
SetBlockConfirmationsOnSubscription(confirmations uint64)
SetQueryHandler(handler QueryHandler) // use QueryHandler instead of SubscriberStorage if handler set
SetFetchMissingHeaders(enable bool)
// Provided for handler submitting query.
SubmitQuery(query ethereum.FilterQuery) error
DeleteQuery(query ethereum.FilterQuery) error
SubscribeNewHead(ctx context.Context, ch chan<- *etypes.Header) (ethereum.Subscription, error)
SubscribeFilterFullTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (ethereum.Subscription, error)
// SubscribeFullPendingTransactions subscribes to new pending transactions.
SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
// SubscribePendingTransactions subscribes to new pending transaction hashes.
SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error)
SubscribeFilterFullPendingTransactions(ctx context.Context, filter FilterTransaction, ch chan<- *etypes.Transaction) (*rpc.ClientSubscription, error)
SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- etypes.Log) (ethereum.Subscription, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []etypes.Log, err error)
}
Subscriber represents a set of methods about chain subscription
type SubscriberStorage ¶
type SubscriberStorage interface {
QueryStateReader
QueryStateWriter
}
Used only for function `SubscribeFilterlogs` && query.ToBlock == nil
Source Files
¶
Click to show internal directories.
Click to hide internal directories.