Documentation
¶
Index ¶
- Constants
- func AddListener(name string, ...)
- type BaseJob
- func (e *BaseJob) CreatedAt() time.Time
- func (e *BaseJob) FromChainID() *big.Int
- func (e *BaseJob) GetBackOff() int
- func (e *BaseJob) GetData() []byte
- func (e *BaseJob) GetID() int32
- func (e *BaseJob) GetListener() Listener
- func (e *BaseJob) GetMaxTry() int
- func (e *BaseJob) GetNextTry() int64
- func (e *BaseJob) GetRetryCount() int
- func (e *BaseJob) GetSubscriptionName() string
- func (e *BaseJob) GetTransaction() Transaction
- func (e *BaseJob) GetType() int
- func (e *BaseJob) GetValue() *big.Int
- func (e *BaseJob) Hash() common.Hash
- func (e *BaseJob) IncreaseRetryCount()
- func (e *BaseJob) Process() ([]byte, error)
- func (e *BaseJob) Save(status string) error
- func (e *BaseJob) SetID(id int32)
- func (e *BaseJob) String() string
- func (e *BaseJob) UpdateNextTry(nextTry int64)
- func (e *BaseJob) Utils() utils.Utils
- type Block
- type BridgeWorker
- type Config
- type Controller
- type EmptyTransaction
- type Handler
- type Job
- type JobHandler
- type Listener
- type ListenerStats
- type Log
- type LsConfig
- type Pool
- func (p *Pool) AddWorkers(workers []Worker)
- func (p *Pool) Enqueue(job JobHandler)
- func (p *Pool) IsClosed() bool
- func (p *Pool) PrepareRetryableJob(job JobHandler)
- func (p *Pool) RetryJob(job JobHandler)
- func (p *Pool) SendJobToWorker(workerCh chan JobHandler, job JobHandler)
- func (p *Pool) Start(closeFunc func())
- func (p *Pool) Stats() Stats
- func (p *Pool) Wait()
- type Receipt
- type Secret
- type Stats
- type Subscribe
- type TaskHandler
- type Transaction
- type Worker
Constants ¶
View Source
const ( ListenHandler = iota CallbackHandler )
View Source
const ( TxEvent = iota LogEvent )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseJob ¶
type BaseJob struct {
// contains filtered or unexported fields
}
func NewBaseJob ¶
func (*BaseJob) FromChainID ¶
func (*BaseJob) GetBackOff ¶
func (*BaseJob) GetListener ¶
func (*BaseJob) GetNextTry ¶
func (*BaseJob) GetRetryCount ¶
func (*BaseJob) GetSubscriptionName ¶
func (*BaseJob) GetTransaction ¶
func (e *BaseJob) GetTransaction() Transaction
func (*BaseJob) IncreaseRetryCount ¶
func (e *BaseJob) IncreaseRetryCount()
func (*BaseJob) UpdateNextTry ¶
type BridgeWorker ¶
type BridgeWorker struct {
// contains filtered or unexported fields
}
func (*BridgeWorker) Channel ¶
func (w *BridgeWorker) Channel() chan JobHandler
func (*BridgeWorker) Close ¶
func (w *BridgeWorker) Close()
func (*BridgeWorker) Context ¶
func (w *BridgeWorker) Context() context.Context
func (*BridgeWorker) ProcessJob ¶
func (w *BridgeWorker) ProcessJob(job JobHandler) error
func (*BridgeWorker) Stop ¶
func (w *BridgeWorker) Stop()
func (*BridgeWorker) String ¶
func (w *BridgeWorker) String() string
func (*BridgeWorker) Wait ¶
func (w *BridgeWorker) Wait()
type Config ¶
type Config struct {
Listeners map[string]*LsConfig `json:"listeners" mapstructure:"listeners"`
NumberOfWorkers int `json:"numberOfWorkers" mapstructure:"numberOfWorkers"`
MaxQueueSize int `json:"maxQueueSize" mapstructure:"maxQueueSize"`
MaxRetry int32 `json:"maxRetry" mapstructure:"maxRetry"`
BackOff int32 `json:"backoff" mapstructure:"backoff"`
DB *stores.Database `json:"database" mapstructure:"database"`
// this field is used for testing purpose
Testing bool
}
type Controller ¶
type Controller struct {
HandlerABIs map[string]*abi.ABI
Pool *Pool
// contains filtered or unexported fields
}
func NewWithContext ¶ added in v0.1.3
func (*Controller) Close ¶
func (c *Controller) Close()
func (*Controller) LoadABIsFromConfig ¶
func (c *Controller) LoadABIsFromConfig(lsConfig *LsConfig) (err error)
LoadABIsFromConfig loads all ABIPath and add results to Handler.ABI
func (*Controller) Start ¶
func (c *Controller) Start() error
type EmptyTransaction ¶
type EmptyTransaction struct {
// contains filtered or unexported fields
}
func NewEmptyTransaction ¶
func (*EmptyTransaction) GetData ¶
func (b *EmptyTransaction) GetData() []byte
func (*EmptyTransaction) GetFromAddress ¶
func (b *EmptyTransaction) GetFromAddress() string
func (*EmptyTransaction) GetHash ¶
func (b *EmptyTransaction) GetHash() common.Hash
func (*EmptyTransaction) GetToAddress ¶
func (b *EmptyTransaction) GetToAddress() string
func (*EmptyTransaction) GetValue ¶
func (b *EmptyTransaction) GetValue() *big.Int
type Handler ¶
type Handler struct {
// Contract Name that will be used to get ABI
Contract string `json:"contract" mapstructure:"contract"`
// Name is method/event name
Name string `json:"name" mapstructure:"name"`
// ContractAddress is used in callback case
ContractAddress string `json:"contractAddress" mapstructure:"contractAddress"`
// Listener who triggers callback event
Listener string `json:"listener" mapstructure:"listener"`
ABI *abi.ABI `json:"-"`
// HandleMethod is used when processing listened job, do nothing if it is empty
HandleMethod string `json:"handleMethod" mapstructure:"handleMethod"`
}
type Job ¶
type JobHandler ¶
type JobHandler interface {
GetID() int32
GetType() int
GetRetryCount() int
GetNextTry() int64
GetMaxTry() int
GetData() []byte
GetValue() *big.Int
GetBackOff() int
Process() ([]byte, error)
Hash() common.Hash
IncreaseRetryCount()
UpdateNextTry(int64)
GetListener() Listener
GetSubscriptionName() string
GetTransaction() Transaction
FromChainID() *big.Int
Save(string) error
CreatedAt() time.Time
String() string
}
type Listener ¶
type Listener interface {
GetName() string
GetStore() stores.MainStore
Config() *LsConfig
Period() time.Duration
GetSafeBlockRange() uint64
GetCurrentBlock() Block
GetLatestBlock() (Block, error)
GetLatestBlockHeight() (uint64, error)
GetBlock(height uint64) (Block, error)
GetBlockWithLogs(height uint64) (Block, error)
GetChainID() (*big.Int, error)
GetReceipt(common.Hash) (*types.Receipt, error)
Context() context.Context
GetSubscriptions() map[string]*Subscribe
UpdateCurrentBlock(block Block) error
SaveCurrentBlockToDB() error
SaveTransactionsToDB(txs []Transaction) error
GetListenHandleJob(subscriptionName string, tx Transaction, eventId string, data []byte) JobHandler
SendCallbackJobs(listeners map[string]Listener, subscriptionName string, tx Transaction, inputData []byte)
NewJobFromDB(job *models.Job) (JobHandler, error)
Start()
Close()
IsDisabled() bool
SetInitHeight(uint64)
GetInitHeight() uint64
GetEthClient() utils.EthClient
GetTasks() []TaskHandler
GetTask(index int) TaskHandler
AddTask(handler TaskHandler)
IsUpTodate() bool
GetBridgeOperatorSign() utils.ISign
GetVoterSign() utils.ISign
GetRelayerSign() utils.ISign
GetLegacyBridgeOperatorSign() utils.ISign
AddListeners(map[string]Listener)
// GetListener returns listener by name
GetListener(string) Listener
CacheBlocks(blockNumbers map[uint64]struct{})
}
type ListenerStats ¶ added in v0.1.3
type LsConfig ¶
type LsConfig struct {
ChainId string `json:"chainId" mapstructure:"chainId"`
Name string `json:"-"`
RpcUrl string `json:"rpcUrl" mapstructure:"rpcUrl"`
LoadInterval time.Duration `json:"blockTime" mapstructure:"blockTime"`
SafeBlockRange uint64 `json:"safeBlockRange" mapstructure:"safeBlockRange"`
FromHeight uint64 `json:"fromHeight" mapstructure:"fromHeight"`
TaskInterval time.Duration `json:"taskInterval" mapstructure:"taskInterval"`
Disabled bool `json:"disabled" mapstructure:"disabled"`
// TODO: apply more ways to get privatekey. such as: PLAINTEXT, KMS, etc.
Secret *Secret `json:"secret" mapstructure:"secret"`
Subscriptions map[string]*Subscribe `json:"subscriptions" mapstructure:"subscriptions"`
TransactionCheckPeriod time.Duration `json:"transactionCheckPeriod" mapstructure:"transactionCheckPeriod"`
Contracts map[string]string `json:"contracts" mapstructure:"contracts"`
ProcessWithinBlocks uint64 `json:"processWithinBlocks" mapstructure:"processWithinBlocks"`
MaxTasksQuery int `json:"maxTasksQuery" mapstructure:"maxTasksQuery"`
MinTasksQuery int `json:"minTasksQuery" mapstructure:"minTasksQuery"`
// GetLogsBatchSize is used at batch size when calling processBatchLogs
GetLogsBatchSize int `json:"getLogsBatchSize" mapstructure:"getLogsBatchSize"`
// MaxProcessingTasks is used to specify max processing tasks allowed while processing tasks
// if number of tasks reaches this number, it waits until this number decrease
MaxProcessingTasks int `json:"maxProcessingTasks" mapstructure:"maxProcessingTasks"`
GasLimitBumpRatio uint64 `json:"gasLimitBumpRatio" mapstructure:"gasLimitBumpRatio"`
Stats *ListenerStats `json:"stats" mapstructure:"stats"`
}
type Pool ¶
type Pool struct {
Workers []Worker
// message backoff
MaxRetry int32
BackOff int32
// Queue holds a list of worker
Queue chan chan JobHandler
// JobChan receives new job
JobChan chan JobHandler
RetryJobChan chan JobHandler
FailedJobChan chan JobHandler
MaxQueueSize int
// contains filtered or unexported fields
}
func (*Pool) AddWorkers ¶
func (*Pool) Enqueue ¶
func (p *Pool) Enqueue(job JobHandler)
func (*Pool) PrepareRetryableJob ¶
func (p *Pool) PrepareRetryableJob(job JobHandler)
func (*Pool) RetryJob ¶
func (p *Pool) RetryJob(job JobHandler)
func (*Pool) SendJobToWorker ¶
func (p *Pool) SendJobToWorker(workerCh chan JobHandler, job JobHandler)
type Receipt ¶
type Receipt interface {
GetTransaction() Transaction
GetStatus() bool
GetLogs() []Log
}
type Secret ¶
type Secret struct {
BridgeOperator *utils.SignMethodConfig `json:"bridgeOperator" mapstructure:"bridgeOperator"`
Voter *utils.SignMethodConfig `json:"voter" mapstructure:"voter"`
Relayer *utils.SignMethodConfig `json:"relayer" mapstructure:"relayer"`
LegacyBridgeOperator *utils.SignMethodConfig `json:"legacyBridgeOperator" mapstructure:"legacyBridgeOperator"`
}
type Subscribe ¶
type Subscribe struct {
From string `json:"from" mapstructure:"from"`
To string `json:"to" mapstructure:"to"`
// Type can be either TxEvent or LogEvent
Type int `json:"type" mapstructure:"type"`
Handler *Handler `json:"handler" mapstructure:"handler"`
CallBacks map[string]string `json:"callbacks" mapstructure:"callbacks"`
}
type TaskHandler ¶
type Transaction ¶
type Worker ¶
type Worker interface {
Context() context.Context
Close()
ProcessJob(job JobHandler) error
Stop()
Channel() chan JobHandler
Wait()
}
Click to show internal directories.
Click to hide internal directories.