Documentation
¶
Index ¶
- Constants
- func AddListener(name string, ...)
- type BaseJob
- func (e *BaseJob) CreatedAt() time.Time
- func (e *BaseJob) FromChainID() *big.Int
- func (e *BaseJob) GetBackOff() int
- func (e *BaseJob) GetData() []byte
- func (e *BaseJob) GetID() int32
- func (e *BaseJob) GetListener() Listener
- func (e *BaseJob) GetMaxTry() int
- func (e *BaseJob) GetNextTry() int64
- func (e *BaseJob) GetRetryCount() int
- func (e *BaseJob) GetSubscriptionName() string
- func (e *BaseJob) GetTransaction() Transaction
- func (e *BaseJob) GetType() int
- func (e *BaseJob) GetValue() *big.Int
- func (e *BaseJob) Hash() common.Hash
- func (e *BaseJob) IncreaseRetryCount()
- func (e *BaseJob) Process() ([]byte, error)
- func (e *BaseJob) Save() error
- func (e *BaseJob) SetID(id int32)
- func (e *BaseJob) String() string
- func (e *BaseJob) Update(status string) error
- func (e *BaseJob) UpdateNextTry(nextTry int64)
- func (e *BaseJob) Utils() utils.Utils
- type Block
- type BridgeWorker
- func (w *BridgeWorker) Channel() chan JobHandler
- func (w *BridgeWorker) Close()
- func (w *BridgeWorker) Context() context.Context
- func (w *BridgeWorker) FailedChannel() chan<- JobHandler
- func (w *BridgeWorker) IsClose() bool
- func (w *BridgeWorker) PoolChannel() chan<- JobHandler
- func (w *BridgeWorker) ProcessJob(job JobHandler) error
- func (w *BridgeWorker) String() string
- func (w *BridgeWorker) WorkersQueue() chan chan JobHandler
- type Config
- type Controller
- type EmptyTransaction
- type Handler
- type Job
- type JobHandler
- type Listener
- type Log
- type LsConfig
- type Pool
- type Receipt
- type Secret
- type Stats
- type Subscribe
- type TaskHandler
- type Transaction
- type Worker
Constants ¶
View Source
const ( ListenHandler = iota CallbackHandler )
View Source
const ( TxEvent = iota LogEvent )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseJob ¶
type BaseJob struct {
// contains filtered or unexported fields
}
func NewBaseJob ¶
func (*BaseJob) FromChainID ¶
func (*BaseJob) GetBackOff ¶
func (*BaseJob) GetListener ¶
func (*BaseJob) GetNextTry ¶
func (*BaseJob) GetRetryCount ¶
func (*BaseJob) GetSubscriptionName ¶
func (*BaseJob) GetTransaction ¶
func (e *BaseJob) GetTransaction() Transaction
func (*BaseJob) IncreaseRetryCount ¶
func (e *BaseJob) IncreaseRetryCount()
func (*BaseJob) UpdateNextTry ¶
type BridgeWorker ¶
type BridgeWorker struct {
// contains filtered or unexported fields
}
func NewWorker ¶
func NewWorker(ctx context.Context, id int, mainChan, failedChan chan<- JobHandler, queue chan chan JobHandler, size int, listeners map[string]Listener) *BridgeWorker
func (*BridgeWorker) Channel ¶
func (w *BridgeWorker) Channel() chan JobHandler
func (*BridgeWorker) Close ¶
func (w *BridgeWorker) Close()
func (*BridgeWorker) Context ¶
func (w *BridgeWorker) Context() context.Context
func (*BridgeWorker) FailedChannel ¶
func (w *BridgeWorker) FailedChannel() chan<- JobHandler
func (*BridgeWorker) IsClose ¶
func (w *BridgeWorker) IsClose() bool
func (*BridgeWorker) PoolChannel ¶
func (w *BridgeWorker) PoolChannel() chan<- JobHandler
func (*BridgeWorker) ProcessJob ¶
func (w *BridgeWorker) ProcessJob(job JobHandler) error
func (*BridgeWorker) String ¶
func (w *BridgeWorker) String() string
func (*BridgeWorker) WorkersQueue ¶
func (w *BridgeWorker) WorkersQueue() chan chan JobHandler
type Config ¶
type Config struct {
Listeners map[string]*LsConfig `json:"listeners"`
SlackUrl string `json:"slackUrl"`
ScanUrl string `json:"scanUrl"`
NumberOfWorkers int `json:"numberOfWorkers"`
MaxQueueSize int `json:"maxQueueSize"`
MaxRetry int32 `json:"maxRetry"`
BackOff int32 `json:"backoff"`
DB *stores.Database `json:"database"`
// this field is used for testing purpose
Testing bool
}
type Controller ¶
type Controller struct {
HandlerABIs map[string]*abi.ABI
Pool *Pool
// contains filtered or unexported fields
}
func (*Controller) Close ¶
func (c *Controller) Close()
func (*Controller) LoadABIsFromConfig ¶
func (c *Controller) LoadABIsFromConfig(lsConfig *LsConfig) (err error)
LoadABIsFromConfig loads all ABIPath and add results to Handler.ABI
func (*Controller) Start ¶
func (c *Controller) Start() error
type EmptyTransaction ¶
type EmptyTransaction struct {
// contains filtered or unexported fields
}
func NewEmptyTransaction ¶
func (*EmptyTransaction) GetData ¶
func (b *EmptyTransaction) GetData() []byte
func (*EmptyTransaction) GetFromAddress ¶
func (b *EmptyTransaction) GetFromAddress() string
func (*EmptyTransaction) GetHash ¶
func (b *EmptyTransaction) GetHash() common.Hash
func (*EmptyTransaction) GetToAddress ¶
func (b *EmptyTransaction) GetToAddress() string
func (*EmptyTransaction) GetValue ¶
func (b *EmptyTransaction) GetValue() *big.Int
type Handler ¶
type Handler struct {
// Contract Name that will be used to get ABI
Contract string `json:"contract"`
// Name is method/event name
Name string `json:"name"`
// ContractAddress is used in callback case
ContractAddress string `json:"contractAddress"`
// Listener who triggers callback event
Listener string `json:"listener"`
ABI *abi.ABI `json:"-"`
// HandleMethod is used when processing listened job, do nothing if it is empty
HandleMethod string `json:"handleMethod"`
}
type Job ¶
type JobHandler ¶
type JobHandler interface {
GetID() int32
GetType() int
GetRetryCount() int
GetNextTry() int64
GetMaxTry() int
GetData() []byte
GetValue() *big.Int
GetBackOff() int
Process() ([]byte, error)
Hash() common.Hash
IncreaseRetryCount()
UpdateNextTry(int64)
GetListener() Listener
GetSubscriptionName() string
GetTransaction() Transaction
FromChainID() *big.Int
Save() error
Update(string) error
CreatedAt() time.Time
String() string
}
type Listener ¶
type Listener interface {
GetName() string
GetStore() stores.MainStore
Config() *LsConfig
Period() time.Duration
GetSafeBlockRange() uint64
GetPreventOmissionRange() uint64
GetCurrentBlock() Block
GetLatestBlock() (Block, error)
GetLatestBlockHeight() (uint64, error)
GetBlock(height uint64) (Block, error)
GetBlockWithLogs(height uint64) (Block, error)
GetChainID() (*big.Int, error)
GetReceipt(common.Hash) (*types.Receipt, error)
Context() context.Context
GetSubscriptions() map[string]*Subscribe
UpdateCurrentBlock(block Block) error
SaveCurrentBlockToDB() error
SaveTransactionsToDB(txs []Transaction) error
GetListenHandleJob(subscriptionName string, tx Transaction, eventId string, data []byte) JobHandler
SendCallbackJobs(listeners map[string]Listener, subscriptionName string, tx Transaction, inputData []byte)
NewJobFromDB(job *models.Job) (JobHandler, error)
Start()
Close()
IsDisabled() bool
SetInitHeight(uint64)
GetInitHeight() uint64
GetEthClient() utils.EthClient
GetTasks() []TaskHandler
GetTask(index int) TaskHandler
AddTask(handler TaskHandler)
IsUpTodate() bool
SetPrepareJobChan(chan JobHandler)
GetValidatorSign() utils.ISign
AddListeners(map[string]Listener)
// GetListener returns listener by name
GetListener(string) Listener
}
type LsConfig ¶
type LsConfig struct {
ChainId string `json:"chainId"`
Name string `json:"name"`
RpcUrl string `json:"rpcUrl"`
SlackUrl string `json:"slackUrl"`
ScanUrl string `json:"scanUrl"`
LoadInterval time.Duration `json:"blockTime"`
SafeBlockRange uint64 `json:"safeBlockRange"`
PreventOmissionRange uint64 `json:"preventOmissionRange"`
FromHeight uint64 `json:"fromHeight"`
DomainSeparators map[uint64]string `json:"domainSeparators"`
Decimals map[uint64]uint64 `json:"decimals"`
TaskInterval time.Duration `json:"taskInterval"`
Disabled bool `json:"disabled"`
// TODO: apply more ways to get privatekey. such as: PLAINTEXT, KMS, etc.
Secret *Secret `json:"secret"`
Subscriptions map[string]*Subscribe `json:"subscriptions"`
TransactionCheckPeriod time.Duration `json:"transactionCheckPeriod"`
Contracts map[string]string `json:"contracts"`
ProcessWithinBlocks uint64 `json:"processWithinBlocks"`
MaxTasksQuery int `json:"maxTasksQuery"`
MinTasksQuery int `json:"minTasksQuery"`
// GetLogsBatchSize is used at batch size when calling processBatchLogs
GetLogsBatchSize int `json:"getLogsBatchSize"`
// MaxProcessingTasks is used to specify max processing tasks allowed while processing tasks
// if number of tasks reaches this number, it waits until this number decrease
MaxProcessingTasks int `json:"maxProcessingTasks"`
}
type Pool ¶
type Pool struct {
Workers []Worker
// message backoff
MaxRetry int32
BackOff int32
// Queue holds a list of worker
Queue chan chan JobHandler
// JobChan receives new job
JobChan chan JobHandler
RetryJobChan chan JobHandler
FailedJobChan chan JobHandler
PrepareJobChan chan JobHandler
MaxQueueSize int
// contains filtered or unexported fields
}
func (*Pool) AddWorkers ¶
func (*Pool) PrepareJob ¶
func (p *Pool) PrepareJob(job JobHandler) error
PrepareJob saves new job to database
func (*Pool) PrepareRetryableJob ¶
func (p *Pool) PrepareRetryableJob(job JobHandler)
type Receipt ¶
type Receipt interface {
GetTransaction() Transaction
GetStatus() bool
GetLogs() []Log
}
type Secret ¶
type Secret struct {
Validator *utils.SignMethodConfig `json:"validator"`
}
type TaskHandler ¶
type Transaction ¶
type Worker ¶
type Worker interface {
Context() context.Context
Close()
ProcessJob(job JobHandler) error
IsClose() bool
Channel() chan JobHandler
PoolChannel() chan<- JobHandler
WorkersQueue() chan chan JobHandler
}
Click to show internal directories.
Click to hide internal directories.