watcher

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2020 License: GPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTxConflict = errors.New("Transaction conflict")
	ErrTxInvalid  = errors.New("Invalid transaction")
	ErrReadOnly   = errors.New("Cannot modify read-only store")
)
View Source
var (
	ErrWatchServiceClosed = errors.New("Watch service closed")
	ErrWatcherTimeout     = errors.New("Watcher timeout")
)
View Source
var (
	ErrNoRows = errors.New("No rows matched in the database")
)

Functions

This section is empty.

Types

type BlockNumber

type BlockNumber interface {
	GetCurrentBlockNumber() (*big.Int, error)
}

type BoundContract

type BoundContract struct {
	*bind.BoundContract
	// contains filtered or unexported fields
}

BoundContract is a binding object for Ethereum smart contract It contains *bind.BoundContract (in go-ethereum) as an embedding

func NewBoundContract

func NewBoundContract(
	conn *ethclient.Client,
	addr ethcommon.Address,
	rawABI string) (*BoundContract, error)

NewBoundContract creates a new contract binding

func (*BoundContract) CallFunc

func (c *BoundContract) CallFunc(
	result interface{},
	method string,
	params ...interface{}) error

CallFunc invokes a view-only contract method with params as input values and sets the output to result. The result type might be a single field for simple returns, a slice of interfaces for anonymous returns and a struct for named returns.

func (*BoundContract) FilterEvent

func (c *BoundContract) FilterEvent(
	name string,
	opts *bind.FilterOpts,
	event interface{}) (*EventIterator, error)

FilterEvent gets historical events This function returns an iterator over historical events

func (*BoundContract) GetABI

func (c *BoundContract) GetABI() string

GetABI returns contract abi

func (*BoundContract) GetAddr

func (c *BoundContract) GetAddr() ethcommon.Address

GetAddr returns contract addr

func (*BoundContract) GetETHClient

func (c *BoundContract) GetETHClient() *ethclient.Client

GetETHClient return ethereum client

func (*BoundContract) ParseEvent

func (c *BoundContract) ParseEvent(
	name string,
	log ethtypes.Log,
	event interface{}) error

ParseEvent parses the catched event according to the event template

func (*BoundContract) SendTransaction

func (c *BoundContract) SendTransaction(
	auth *bind.TransactOpts,
	method string,
	params ...interface{}) (*ethtypes.Transaction, error)

SendTransaction sends transactions to smart contract via bound contract

func (*BoundContract) WatchEvent

func (c *BoundContract) WatchEvent(
	name string,
	opts *bind.WatchOpts,
	done <-chan bool) (ethtypes.Log, error)

WatchEvent subscribes to future events This function blocks until an event is catched or done signal is received

type CallbackID

type CallbackID uint64

CallbackID is the unique callback ID for deadlines and events

type Contract

type Contract interface {
	GetAddr() ethcommon.Address
	GetABI() string
	GetETHClient() *ethclient.Client
	SendTransaction(*bind.TransactOpts, string, ...interface{}) (*ethtypes.Transaction, error)
	CallFunc(interface{}, string, ...interface{}) error
	WatchEvent(string, *bind.WatchOpts, <-chan bool) (ethtypes.Log, error)
	FilterEvent(string, *bind.FilterOpts, interface{}) (*EventIterator, error)
	ParseEvent(string, ethtypes.Log, interface{}) error
}

type DAL

type DAL struct {
	// contains filtered or unexported fields
}

func NewDAL

func NewDAL(store KVStore) *DAL

func (*DAL) DeleteLogEventWatch

func (d *DAL) DeleteLogEventWatch(name string) error

func (*DAL) GetAllLogEventWatchKeys

func (d *DAL) GetAllLogEventWatchKeys() ([]string, error)

func (*DAL) GetLogEventWatch

func (d *DAL) GetLogEventWatch(name string) (*LogEventID, error)

func (*DAL) HasLogEventWatch

func (d *DAL) HasLogEventWatch(name string) (bool, error)

func (*DAL) PutLogEventWatch

func (d *DAL) PutLogEventWatch(name string, id *LogEventID) error

type Deadline

type Deadline struct {
	BlockNum *big.Int
	Callback func()
}

Deadline is the metadata of a deadline

type DeadlineQueue

type DeadlineQueue []*big.Int

DeadlineQueue is the priority queue for deadlines

func (DeadlineQueue) Len

func (dq DeadlineQueue) Len() int

func (DeadlineQueue) Less

func (dq DeadlineQueue) Less(i, j int) bool

func (*DeadlineQueue) Pop

func (dq *DeadlineQueue) Pop() (popped interface{})

func (*DeadlineQueue) Push

func (dq *DeadlineQueue) Push(x interface{})

func (DeadlineQueue) Swap

func (dq DeadlineQueue) Swap(i, j int)

func (*DeadlineQueue) Top

func (dq *DeadlineQueue) Top() (top interface{})

type Event

type Event struct {
	Addr       ethcommon.Address
	RawAbi     string
	Name       string
	WatchName  string
	StartBlock *big.Int
	EndBlock   *big.Int
	BlockDelay uint64
	Callback   func(CallbackID, ethtypes.Log)
	// contains filtered or unexported fields
}

Event is the metadata for an event

type EventIterator

type EventIterator struct {
	Event    interface{}           // Event containing the contract specifics and raw log
	Contract *BoundContract        // Generic contract to use for unpacking event data
	Name     string                // Event name to use for unpacking event data
	Logs     chan ethtypes.Log     // Log channel receiving the found contract events
	Sub      ethereum.Subscription // Subscription for errors, completion and termination
	Done     bool                  // Whether the subscription completed delivering logs
	Fail     error                 // Occurred error to stop iteration
}

EventIterator is returned from FilterEvent and is used to iterate over the raw logs and unpacked data

func (*EventIterator) Close

func (it *EventIterator) Close() error

Close terminates the iteration process, releasing any pending underlying resources.

func (*EventIterator) Error

func (it *EventIterator) Error() error

Error returns any retrieval or parsing error occurred during filtering.

func (*EventIterator) Next

func (it *EventIterator) Next() (ethtypes.Log, bool)

Next advances the iterator to the subsequent event, returning whether there are any more events found. In case of a retrieval or parsing error, false is returned and Error() can be queried for the exact failure.

type KVStore

type KVStore interface {
	Close()
	OpenTransaction() (Transaction, error)
	Put(table, key string, value interface{}) error
	Get(table, key string, value interface{}) error
	Delete(table, key string) error
	Has(table, key string) (bool, error)
	GetKeysByPrefix(table, prefix string) ([]string, error)
}

KVStore is the interface implemented by the local store (LevelDB wrapper) and by the remote store (gRPC calls to a store server).

type KVStoreLocal

type KVStoreLocal struct {
	// contains filtered or unexported fields
}

func NewKVStoreLocal

func NewKVStoreLocal(rootDir string, readOnly bool) (*KVStoreLocal, error)

Create a new local K/V store at the given root directory.

func (*KVStoreLocal) Close

func (s *KVStoreLocal) Close()

Close the local K/V store.

func (*KVStoreLocal) Delete

func (s *KVStoreLocal) Delete(table, key string) error

Delete the entry for a key within a table's namespace.

func (*KVStoreLocal) Get

func (s *KVStoreLocal) Get(table, key string, value interface{}) error

Extract the value of the given key within a table's namespace into the given variable.

func (*KVStoreLocal) GetKeysByPrefix

func (s *KVStoreLocal) GetKeysByPrefix(table, prefix string) ([]string, error)

Return all keys for a given table and key prefix. The key prefix can be the empty string, which returns all keys within the table.

func (*KVStoreLocal) Has

func (s *KVStoreLocal) Has(table, key string) (bool, error)

Check if an entry exists for the given key within a table's namespace.

func (*KVStoreLocal) OpenTransaction

func (s *KVStoreLocal) OpenTransaction() (Transaction, error)

Start a store transaction.

func (*KVStoreLocal) Put

func (s *KVStoreLocal) Put(table, key string, value interface{}) error

Store a key/value pair within a table's namespace.

type LogEventID

type LogEventID struct {
	BlockNumber uint64 // Number of the block containing the event
	Index       int64  // Index of the event within the block
}

LogEventID tracks the position of a watch event in the event log.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service struct stores service parameters and registered deadlines and events

func NewService

func NewService(
	watch *WatchService, blockDelay uint64, enabled bool, rpcAddr string) *Service

NewService starts a new monitor service. Currently, if "enabled" is false, event monitoring will be disabled, and the IP address of the cNode given as "rpcAddr" will be printed.

func (*Service) Close

func (s *Service) Close()

Close only set events map to empty map so all monitorEvent will exit due to isEventRemoved is true

func (*Service) GetCurrentBlockNumber

func (s *Service) GetCurrentBlockNumber() *big.Int

func (*Service) Init

func (s *Service) Init()

Init creates the event map

func (*Service) Monitor

func (s *Service) Monitor(
	eventName string,
	contract Contract,
	startBlock *big.Int,
	endBlock *big.Int,
	reset bool,
	callback func(CallbackID, ethtypes.Log)) (CallbackID, error)

func (*Service) MonitorEvent

func (s *Service) MonitorEvent(e Event, reset bool) (CallbackID, error)

func (*Service) RegisterDeadline

func (s *Service) RegisterDeadline(d Deadline) CallbackID

RegisterDeadline registers the deadline and returns the ID

func (*Service) RemoveDeadline

func (s *Service) RemoveDeadline(id CallbackID)

RemoveDeadline removes a deadline from the monitor

func (*Service) RemoveEvent

func (s *Service) RemoveEvent(id CallbackID)

RemoveEvent removes an event from the monitor

type Storage

type Storage interface {
	Put(table, key string, value interface{}) error
	Get(table, key string, value interface{}) error
	Delete(table, key string) error
	Has(table, key string) (bool, error)
	GetKeysByPrefix(table, prefix string) ([]string, error)
}

type Transaction

type Transaction interface {
	Commit() error
	Discard()
	ConvertError(err error) error
	Put(table, key string, value interface{}) error
	Get(table, key string, value interface{}) error
	Delete(table, key string) error
	Has(table, key string) (bool, error)
	GetKeysByPrefix(table, prefix string) ([]string, error)
}

Transaction is the interface implemented by the local and remote stores.

type TransactionLocal

type TransactionLocal struct {
	// contains filtered or unexported fields
}

func (*TransactionLocal) Commit

func (tx *TransactionLocal) Commit() error

Commit a transaction.

func (*TransactionLocal) ConvertError

func (tx *TransactionLocal) ConvertError(err error) error

func (*TransactionLocal) Delete

func (tx *TransactionLocal) Delete(table, key string) error

In a transaction, delete the entry for a key within a table's namespace.

func (*TransactionLocal) Discard

func (tx *TransactionLocal) Discard()

Discard a transaction.

func (*TransactionLocal) Get

func (tx *TransactionLocal) Get(table, key string, value interface{}) error

In a transaction, extract the value of the given key within a table's namespace into the given variable.

func (*TransactionLocal) GetKeysByPrefix

func (tx *TransactionLocal) GetKeysByPrefix(table, prefix string) ([]string, error)

In a transaction, return all keys for a given table and key prefix. The key prefix can be the empty string, which returns all keys within the table.

func (*TransactionLocal) Has

func (tx *TransactionLocal) Has(table, key string) (bool, error)

In a transaction, check if an entry exists for the given key within a table's namespace.

func (*TransactionLocal) Put

func (tx *TransactionLocal) Put(table, key string, value interface{}) error

In a transaction, store a key/value pair within a table's namespace.

type Watch

type Watch struct {
	// contains filtered or unexported fields
}

Watch provides an iterator over a stream of event logs that match an Ethereum filtering query. It updates the KVStore to persist the position in the stream of the last event log that the application has acknowledged receiving. To handle chain reorganization (ephemeral forking), watch only requests from on-chain event logs that are older than a specified number of on-chain blocks.

func (*Watch) Ack

func (w *Watch) Ack() error

The app ACKs the complete processing of the last received event log. Be lenient in one case: after the watch is closed, allow at most one more ACK to be done. This allows event processing that was completed by the application when an asynchronous Close() took place (between the Next() and the Ack() calls) to be persisted into storage instead of having it be re-done after the application is restarted.

func (*Watch) Close

func (w *Watch) Close()

Close a watch subscription.

func (*Watch) Next

func (w *Watch) Next() (types.Log, error)

Fetch the next log event. The function will block until either an event log is available, or the watcher is closed.

type WatchClient

type WatchClient interface {
	HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
	FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
}

WatchClient is an interface for the subset of functions of the Go-Ethereum client API that the watch service uses.

type WatchDAL

type WatchDAL interface {
	GetLogEventWatch(name string) (*LogEventID, error)
	PutLogEventWatch(name string, id *LogEventID) error
	DeleteLogEventWatch(name string) error
	HasLogEventWatch(name string) (bool, error)
	GetAllLogEventWatchKeys() ([]string, error)
}

WatchDAL is an interface for the watch-specific API of the KVStore data access layer.

type WatchService

type WatchService struct {
	// contains filtered or unexported fields
}

WatchService holds the active watchers and their connections to the Ethereum client and the KVStore persistence layer that provides resumability of the watcher after a restart.

func NewWatchService

func NewWatchService(client WatchClient, dal WatchDAL, polling uint64) *WatchService

Create a watch service.

func (*WatchService) Close

func (ws *WatchService) Close()

Close the watch service.

func (*WatchService) GetBlockNumber

func (ws *WatchService) GetBlockNumber() uint64

Return the most recent on-chain block number.

func (*WatchService) GetCurrentBlockNumber

func (ws *WatchService) GetCurrentBlockNumber() *big.Int

Return the most recent on-chain block number in big.Int format.

func (*WatchService) MakeFilterQuery

func (ws *WatchService) MakeFilterQuery(addr ethcommon.Address, rawABI string, eventName string, startBlock *big.Int) (ethereum.FilterQuery, error)

MakeFilterQuery constructs an Ethereum FilterQuery structure from these event and contract parameters: address, raw ABI string, event name, and the optional start block number.

func (*WatchService) NewWatch

func (ws *WatchService) NewWatch(name string, query ethereum.FilterQuery, blkDelay, blkInterval uint64, reset bool) (*Watch, error)

Create a watch for the given Ethereum log filtering query. The block delay is the number of blocks mined used as a time delay for fetching event logs, mitigating the effects of chain reorg. The block interval controls the polling frequency of fetch logs from on-chain, but measured in block numbers (as a delta). If "reset" is enabled, the watcher ignores the previously stored position in the subscription which resets the stream to its start.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL