Documentation
¶
Index ¶
- Variables
- func Main(version string) cliapp.LifecycleAction
- type BlockBuffer
- type CLIConfig
- type FailsafeClient
- type FinalityCallback
- type Finder
- type FinderClient
- type InteropMessageMetrics
- type InteropMonitorConfig
- type InteropMonitorService
- func (ms *InteropMonitorService) Kill() error
- func (ms *InteropMonitorService) RouteNewJob(job *Job)
- func (ms *InteropMonitorService) SetExpiry(chainID eth.ChainID, expiry eth.BlockInfo)
- func (ms *InteropMonitorService) Start(ctx context.Context) error
- func (ms *InteropMonitorService) Stop(ctx context.Context) error
- func (ms *InteropMonitorService) Stopped() bool
- type Job
- func (j *Job) AddInitiatingHash(hash common.Hash)
- func (j *Job) DidMetrics() bool
- func (j *Job) ID() JobID
- func (j *Job) InitiatingHashes() []common.Hash
- func (j *Job) LastEvaluated() time.Time
- func (j *Job) LatestStatus() jobStatus
- func (j *Job) SetDidMetrics()
- func (j *Job) Statuses() []jobStatus
- func (j *Job) String() string
- func (j *Job) TerminalAt() time.Time
- func (j *Job) UpdateLastEvaluated(t time.Time)
- func (j *Job) UpdateStatus(status jobStatus)
- type JobFilter
- type JobID
- type MetricCollector
- type NewCallback
- type RPCFinder
- type RPCUpdater
- func (t *RPCUpdater) CollectForMetrics(jobs map[JobID]*Job) map[JobID]*Job
- func (t *RPCUpdater) Enqueue(job *Job)
- func (t *RPCUpdater) Run(ctx context.Context)
- func (t *RPCUpdater) ShouldExpire(job *Job) bool
- func (t *RPCUpdater) Start(ctx context.Context) error
- func (t *RPCUpdater) Stop() error
- func (t *RPCUpdater) Stopped() bool
- func (t *RPCUpdater) UpdateJob(job *Job) error
- func (t *RPCUpdater) UpdateJobStatus(job *Job)
- type SupervisorClient
- type Updater
- type UpdaterClient
Constants ¶
This section is empty.
Variables ¶
var ErrAlreadyStopped = errors.New("already stopped")
var ErrBlockNotContiguous = errors.New("blocks are not contiguous")
var ErrBlockNotFound = errors.New("block not found")
var ErrLogNotFound = errors.New("log not found")
var ErrNotExecutingMessage = errors.New("not an executing message")
Functions ¶
func Main ¶
func Main(version string) cliapp.LifecycleAction
Types ¶
type BlockBuffer ¶
type BlockBuffer struct {
// contains filtered or unexported fields
}
BlockBuffer is a circular buffer of seen blocks. It can be used as a fix-sized stack of blocks to ensure a canonical and contiguous view of the block history.
func NewBlockBuffer ¶
func NewBlockBuffer(size int) *BlockBuffer
NewBlockBuffer creates a new block buffer
func (*BlockBuffer) Add ¶
func (r *BlockBuffer) Add(block eth.BlockInfo)
Add adds a block to the buffer
func (*BlockBuffer) Peek ¶
func (r *BlockBuffer) Peek() eth.BlockInfo
Peek returns the last added block to the buffer if the buffer is empty, it returns nil if the buffer is not empty, it returns the last added block
type CLIConfig ¶
type FailsafeClient ¶
type FailsafeClient interface {
SetFailsafeEnabled(ctx context.Context, enabled bool) error
GetFailsafeEnabled(ctx context.Context) (bool, error)
}
FailsafeClient defines the interface for controlling failsafe functionality
type FinalityCallback ¶
FinalityCallback is a function to be called when the finality of jobs for this chain is updated
type FinderClient ¶
type FinderClient interface {
InfoByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, error)
InfoByNumber(ctx context.Context, number uint64) (eth.BlockInfo, error)
FetchReceiptsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Receipts, error)
}
FinderClient is a client that can be used to find new blocks and their receipts it is satisfied by the ethclient.Client type
type InteropMessageMetrics ¶
type InteropMessageMetrics interface {
RecordMessageStatus(executingChainID string, initiatingChainID string, status string, count float64)
RecordTerminalStatusChange(executingChainID string, initiatingChainID string, count float64)
RecordExecutingBlockRange(chainID string, min uint64, max uint64)
RecordInitiatingBlockRange(chainID string, min uint64, max uint64)
}
type InteropMonitorConfig ¶
type InteropMonitorService ¶
type InteropMonitorService struct {
Log log.Logger
Metrics metrics.Metricer
InteropMonitorConfig
Version string
// contains filtered or unexported fields
}
func InteropMonitorServiceFromClients ¶
func InteropMonitorServiceFromClients( ctx context.Context, version string, cfg *CLIConfig, clients map[eth.ChainID]*sources.EthClient, failsafeClients []FailsafeClient, log log.Logger, ) (*InteropMonitorService, error)
InteropMonitorServiceFromClients creates a new InteropMonitorService with pre-initialized clients
func (*InteropMonitorService) Kill ¶
func (ms *InteropMonitorService) Kill() error
func (*InteropMonitorService) RouteNewJob ¶
func (ms *InteropMonitorService) RouteNewJob(job *Job)
RouteNewJob routes a new job to the appropriate updater by simply enqueuing to the initiating chain's updater
func (*InteropMonitorService) SetExpiry ¶
func (ms *InteropMonitorService) SetExpiry(chainID eth.ChainID, expiry eth.BlockInfo)
SetExpiry sets the expiry for a chain ID
func (*InteropMonitorService) Start ¶
func (ms *InteropMonitorService) Start(ctx context.Context) error
func (*InteropMonitorService) Stop ¶
func (ms *InteropMonitorService) Stop(ctx context.Context) error
func (*InteropMonitorService) Stopped ¶
func (ms *InteropMonitorService) Stopped() bool
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
Job is a job that is being tracked by the monitor it represents an executing message and initiating message pair it is used to track the status of the executing message over time along with pertinent metadata about the initiating message its getters and setters are thread safe
func BlockReceiptsToJobs ¶
BlockReceiptsToJobs converts a slice of receipts to a slice of jobs
func JobFromExecutingMessageLog ¶
JobFromExecutingMessageLog converts a log to a job
func (*Job) AddInitiatingHash ¶
AddInitiatingHash adds a hash to the initiatingHash slice if it hasn't been seen before
func (*Job) DidMetrics ¶
DidMetrics returns true if the job has been used to update the metrics at least once
func (*Job) InitiatingHashes ¶
InitiatingHashes returns a copy of the initiating hashes
func (*Job) LastEvaluated ¶
LastEvaluated returns the last evaluated time of the job
func (*Job) LatestStatus ¶
func (j *Job) LatestStatus() jobStatus
LatestStatus returns the latest status of the job
func (*Job) SetDidMetrics ¶
func (j *Job) SetDidMetrics()
SetDidMetrics sets the did metrics flag of the job
func (*Job) TerminalAt ¶
TerminalAt returns the time the job last transitioned to a terminal state
func (*Job) UpdateLastEvaluated ¶
UpdateLastEvaluated updates the last evaluated time of the job
func (*Job) UpdateStatus ¶
func (j *Job) UpdateStatus(status jobStatus)
UpdateStatus updates the status of the job
type JobFilter ¶
JobFilter is a function that turns any executing messages from a slice of receipts into a slice of jobs which can be added to an Updater's inbox
type MetricCollector ¶
type MetricCollector struct {
// contains filtered or unexported fields
}
func NewMetricCollector ¶
func NewMetricCollector(log log.Logger, m InteropMessageMetrics, updaters map[eth.ChainID]Updater, failsafeClients []FailsafeClient, triggerFailsafe bool) *MetricCollector
func (*MetricCollector) CheckFailsafeStatus ¶
func (m *MetricCollector) CheckFailsafeStatus()
func (*MetricCollector) CollectMetrics ¶
func (m *MetricCollector) CollectMetrics()
CollectMetrics scans the jobMaps, consolidates them, and updates the metrics
func (*MetricCollector) Run ¶
func (m *MetricCollector) Run()
Run is the main loop for the metric collector
func (*MetricCollector) Start ¶
func (m *MetricCollector) Start() error
func (*MetricCollector) Stop ¶
func (m *MetricCollector) Stop() error
func (*MetricCollector) Stopped ¶
func (m *MetricCollector) Stopped() bool
func (*MetricCollector) TriggerFailsafe ¶
func (m *MetricCollector) TriggerFailsafe()
type NewCallback ¶
type NewCallback func(*Job)
NewCallback is a function to be called when a new job is created
type RPCFinder ¶
type RPCFinder struct {
// contains filtered or unexported fields
}
RPCFinder connects to an Ethereum chain and extracts receipts in order to create jobs
func NewFinder ¶
func NewFinder(chainID eth.ChainID, client FinderClient, toCases JobFilter, newCallback NewCallback, finalityCallback FinalityCallback, bufferSize int, log log.Logger) *RPCFinder
type RPCUpdater ¶
type RPCUpdater struct {
// contains filtered or unexported fields
}
RPCFinder connects to an Ethereum chain and extracts receipts in order to create jobs
func NewUpdater ¶
func NewUpdater( chainID eth.ChainID, client UpdaterClient, finalized *locks.RWMap[eth.ChainID, eth.NumberAndHash], log log.Logger) *RPCUpdater
func (*RPCUpdater) CollectForMetrics ¶
func (t *RPCUpdater) CollectForMetrics(jobs map[JobID]*Job) map[JobID]*Job
GetJobs adds all jobs to the provided map and returns it
func (*RPCUpdater) Enqueue ¶
func (t *RPCUpdater) Enqueue(job *Job)
todo: make this a priority queue
func (*RPCUpdater) Run ¶
func (t *RPCUpdater) Run(ctx context.Context)
func (*RPCUpdater) ShouldExpire ¶
func (t *RPCUpdater) ShouldExpire(job *Job) bool
ShouldExpire returns true if the job should be expired jobs should only be expired when *both components* exist in finalized blocks. That is: - the initiating block is finalized - the executing block is finalized Before this point, the job status could change if a reorg affects either the initiating or executing block. This also checks that the job has been evaluated at least once, and counted for metrics at least once.
func (*RPCUpdater) Stop ¶
func (t *RPCUpdater) Stop() error
TODO: add wait group to make Stop return sync
func (*RPCUpdater) Stopped ¶
func (t *RPCUpdater) Stopped() bool
func (*RPCUpdater) UpdateJob ¶
func (t *RPCUpdater) UpdateJob(job *Job) error
func (*RPCUpdater) UpdateJobStatus ¶
func (t *RPCUpdater) UpdateJobStatus(job *Job)
type SupervisorClient ¶
type SupervisorClient struct {
// contains filtered or unexported fields
}
SupervisorClient provides functionality to call admin_setFailsafeEnabled on the supervisor
func NewSupervisorClient ¶
func NewSupervisorClient(endpoint string, log log.Logger) (*SupervisorClient, error)
NewSupervisorClient creates a new supervisor client
func (*SupervisorClient) Close ¶
func (sc *SupervisorClient) Close()
Close closes the underlying RPC client
func (*SupervisorClient) GetFailsafeEnabled ¶
func (sc *SupervisorClient) GetFailsafeEnabled(ctx context.Context) (bool, error)
GetFailsafeEnabled calls admin_getFailsafeEnabled on the supervisor
func (*SupervisorClient) SetFailsafeEnabled ¶
func (sc *SupervisorClient) SetFailsafeEnabled(ctx context.Context, enabled bool) error
SetFailsafeEnabled calls admin_setFailsafeEnabled on the supervisor