monitor

package
v1.16.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyStopped = errors.New("already stopped")
View Source
var ErrBlockNotContiguous = errors.New("blocks are not contiguous")
View Source
var ErrBlockNotFound = errors.New("block not found")
View Source
var ErrLogNotFound = errors.New("log not found")
View Source
var ErrNotExecutingMessage = errors.New("not an executing message")

Functions

func Main

func Main(version string) cliapp.LifecycleAction

Types

type BlockBuffer

type BlockBuffer struct {
	// contains filtered or unexported fields
}

BlockBuffer is a circular buffer of seen blocks. It can be used as a fix-sized stack of blocks to ensure a canonical and contiguous view of the block history.

func NewBlockBuffer

func NewBlockBuffer(size int) *BlockBuffer

NewBlockBuffer creates a new block buffer

func (*BlockBuffer) Add

func (r *BlockBuffer) Add(block eth.BlockInfo)

Add adds a block to the buffer

func (*BlockBuffer) Peek

func (r *BlockBuffer) Peek() eth.BlockInfo

Peek returns the last added block to the buffer if the buffer is empty, it returns nil if the buffer is not empty, it returns the last added block

func (*BlockBuffer) Pop

func (r *BlockBuffer) Pop() (eth.BlockInfo, error)

func (*BlockBuffer) Reset

func (r *BlockBuffer) Reset()

Reset resets the buffer to empty

type CLIConfig

type CLIConfig struct {
	L2Rpcs              []string
	PollInterval        time.Duration
	SupervisorEndpoints []string
	TriggerFailsafe     bool
	RPCConfig           oprpc.CLIConfig
	LogConfig           oplog.CLIConfig
	MetricsConfig       opmetrics.CLIConfig
	PprofConfig         oppprof.CLIConfig
}

func NewConfig

func NewConfig(ctx *cli.Context) *CLIConfig

func (*CLIConfig) Check

func (c *CLIConfig) Check() error

type FailsafeClient

type FailsafeClient interface {
	SetFailsafeEnabled(ctx context.Context, enabled bool) error
	GetFailsafeEnabled(ctx context.Context) (bool, error)
}

FailsafeClient defines the interface for controlling failsafe functionality

type FinalityCallback

type FinalityCallback func(chainID eth.ChainID, block eth.BlockInfo)

FinalityCallback is a function to be called when the finality of jobs for this chain is updated

type Finder

type Finder interface {
	Start(ctx context.Context) error
	Stop() error
}

Finders are responsible for finding new jobs from a chain for an Updater to track

type FinderClient

type FinderClient interface {
	InfoByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, error)
	InfoByNumber(ctx context.Context, number uint64) (eth.BlockInfo, error)
	FetchReceiptsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Receipts, error)
}

FinderClient is a client that can be used to find new blocks and their receipts it is satisfied by the ethclient.Client type

type InteropMessageMetrics

type InteropMessageMetrics interface {
	RecordMessageStatus(executingChainID string, initiatingChainID string, status string, count float64)
	RecordTerminalStatusChange(executingChainID string, initiatingChainID string, count float64)
	RecordExecutingBlockRange(chainID string, min uint64, max uint64)
	RecordInitiatingBlockRange(chainID string, min uint64, max uint64)
}

type InteropMonitorConfig

type InteropMonitorConfig struct {
	PollInterval time.Duration
}

type InteropMonitorService

type InteropMonitorService struct {
	Log     log.Logger
	Metrics metrics.Metricer

	InteropMonitorConfig

	Version string
	// contains filtered or unexported fields
}

func InteropMonitorServiceFromCLIConfig

func InteropMonitorServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) (*InteropMonitorService, error)

func InteropMonitorServiceFromClients

func InteropMonitorServiceFromClients(
	ctx context.Context,
	version string,
	cfg *CLIConfig,
	clients map[eth.ChainID]*sources.EthClient,
	failsafeClients []FailsafeClient,
	log log.Logger,
) (*InteropMonitorService, error)

InteropMonitorServiceFromClients creates a new InteropMonitorService with pre-initialized clients

func (*InteropMonitorService) Kill

func (ms *InteropMonitorService) Kill() error

func (*InteropMonitorService) RouteNewJob

func (ms *InteropMonitorService) RouteNewJob(job *Job)

RouteNewJob routes a new job to the appropriate updater by simply enqueuing to the initiating chain's updater

func (*InteropMonitorService) SetExpiry

func (ms *InteropMonitorService) SetExpiry(chainID eth.ChainID, expiry eth.BlockInfo)

SetExpiry sets the expiry for a chain ID

func (*InteropMonitorService) Start

func (ms *InteropMonitorService) Start(ctx context.Context) error

func (*InteropMonitorService) Stop

func (*InteropMonitorService) Stopped

func (ms *InteropMonitorService) Stopped() bool

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job is a job that is being tracked by the monitor it represents an executing message and initiating message pair it is used to track the status of the executing message over time along with pertinent metadata about the initiating message its getters and setters are thread safe

func BlockReceiptsToJobs

func BlockReceiptsToJobs(receipts []*types.Receipt, executingChain eth.ChainID) []*Job

BlockReceiptsToJobs converts a slice of receipts to a slice of jobs

func JobFromExecutingMessageLog

func JobFromExecutingMessageLog(log *types.Log, executingChain eth.ChainID) (Job, error)

JobFromExecutingMessageLog converts a log to a job

func (*Job) AddInitiatingHash

func (j *Job) AddInitiatingHash(hash common.Hash)

AddInitiatingHash adds a hash to the initiatingHash slice if it hasn't been seen before

func (*Job) DidMetrics

func (j *Job) DidMetrics() bool

DidMetrics returns true if the job has been used to update the metrics at least once

func (*Job) ID

func (j *Job) ID() JobID

ID returns the ID of the job

func (*Job) InitiatingHashes

func (j *Job) InitiatingHashes() []common.Hash

InitiatingHashes returns a copy of the initiating hashes

func (*Job) LastEvaluated

func (j *Job) LastEvaluated() time.Time

LastEvaluated returns the last evaluated time of the job

func (*Job) LatestStatus

func (j *Job) LatestStatus() jobStatus

LatestStatus returns the latest status of the job

func (*Job) SetDidMetrics

func (j *Job) SetDidMetrics()

SetDidMetrics sets the did metrics flag of the job

func (*Job) Statuses

func (j *Job) Statuses() []jobStatus

Statuses returns the states of the job

func (*Job) String

func (j *Job) String() string

String returns a string representation of the job

func (*Job) TerminalAt

func (j *Job) TerminalAt() time.Time

TerminalAt returns the time the job last transitioned to a terminal state

func (*Job) UpdateLastEvaluated

func (j *Job) UpdateLastEvaluated(t time.Time)

UpdateLastEvaluated updates the last evaluated time of the job

func (*Job) UpdateStatus

func (j *Job) UpdateStatus(status jobStatus)

UpdateStatus updates the status of the job

type JobFilter

type JobFilter func(receipts []*types.Receipt, executingChain eth.ChainID) []*Job

JobFilter is a function that turns any executing messages from a slice of receipts into a slice of jobs which can be added to an Updater's inbox

type JobID

type JobID string

func JobId

func JobId(
	executingBlockNumber uint64,
	executingLogIndex uint,
	executingPayload common.Hash,
	executingChain eth.ChainID,
	intitiatingBlockNumber uint64,
	logIndex uint32,
	initiatingChain eth.ChainID,
) JobID

type MetricCollector

type MetricCollector struct {
	// contains filtered or unexported fields
}

func NewMetricCollector

func NewMetricCollector(log log.Logger, m InteropMessageMetrics, updaters map[eth.ChainID]Updater, failsafeClients []FailsafeClient, triggerFailsafe bool) *MetricCollector

func (*MetricCollector) CheckFailsafeStatus

func (m *MetricCollector) CheckFailsafeStatus()

func (*MetricCollector) CollectMetrics

func (m *MetricCollector) CollectMetrics()

CollectMetrics scans the jobMaps, consolidates them, and updates the metrics

func (*MetricCollector) Run

func (m *MetricCollector) Run()

Run is the main loop for the metric collector

func (*MetricCollector) Start

func (m *MetricCollector) Start() error

func (*MetricCollector) Stop

func (m *MetricCollector) Stop() error

func (*MetricCollector) Stopped

func (m *MetricCollector) Stopped() bool

func (*MetricCollector) TriggerFailsafe

func (m *MetricCollector) TriggerFailsafe()

type NewCallback

type NewCallback func(*Job)

NewCallback is a function to be called when a new job is created

type RPCFinder

type RPCFinder struct {
	// contains filtered or unexported fields
}

RPCFinder connects to an Ethereum chain and extracts receipts in order to create jobs

func NewFinder

func NewFinder(chainID eth.ChainID,
	client FinderClient,
	toCases JobFilter,
	newCallback NewCallback,
	finalityCallback FinalityCallback,
	bufferSize int,
	log log.Logger) *RPCFinder

func (*RPCFinder) Run

func (t *RPCFinder) Run(ctx context.Context)

func (*RPCFinder) Start

func (t *RPCFinder) Start(ctx context.Context) error

func (*RPCFinder) Stop

func (t *RPCFinder) Stop() error

func (*RPCFinder) Stopped

func (t *RPCFinder) Stopped() bool

type RPCUpdater

type RPCUpdater struct {
	// contains filtered or unexported fields
}

RPCFinder connects to an Ethereum chain and extracts receipts in order to create jobs

func NewUpdater

func NewUpdater(
	chainID eth.ChainID,
	client UpdaterClient,
	finalized *locks.RWMap[eth.ChainID, eth.NumberAndHash],
	log log.Logger) *RPCUpdater

func (*RPCUpdater) CollectForMetrics

func (t *RPCUpdater) CollectForMetrics(jobs map[JobID]*Job) map[JobID]*Job

GetJobs adds all jobs to the provided map and returns it

func (*RPCUpdater) Enqueue

func (t *RPCUpdater) Enqueue(job *Job)

todo: make this a priority queue

func (*RPCUpdater) Run

func (t *RPCUpdater) Run(ctx context.Context)

func (*RPCUpdater) ShouldExpire

func (t *RPCUpdater) ShouldExpire(job *Job) bool

ShouldExpire returns true if the job should be expired jobs should only be expired when *both components* exist in finalized blocks. That is: - the initiating block is finalized - the executing block is finalized Before this point, the job status could change if a reorg affects either the initiating or executing block. This also checks that the job has been evaluated at least once, and counted for metrics at least once.

func (*RPCUpdater) Start

func (t *RPCUpdater) Start(ctx context.Context) error

func (*RPCUpdater) Stop

func (t *RPCUpdater) Stop() error

TODO: add wait group to make Stop return sync

func (*RPCUpdater) Stopped

func (t *RPCUpdater) Stopped() bool

func (*RPCUpdater) UpdateJob

func (t *RPCUpdater) UpdateJob(job *Job) error

func (*RPCUpdater) UpdateJobStatus

func (t *RPCUpdater) UpdateJobStatus(job *Job)

type SupervisorClient

type SupervisorClient struct {
	// contains filtered or unexported fields
}

SupervisorClient provides functionality to call admin_setFailsafeEnabled on the supervisor

func NewSupervisorClient

func NewSupervisorClient(endpoint string, log log.Logger) (*SupervisorClient, error)

NewSupervisorClient creates a new supervisor client

func (*SupervisorClient) Close

func (sc *SupervisorClient) Close()

Close closes the underlying RPC client

func (*SupervisorClient) GetFailsafeEnabled

func (sc *SupervisorClient) GetFailsafeEnabled(ctx context.Context) (bool, error)

GetFailsafeEnabled calls admin_getFailsafeEnabled on the supervisor

func (*SupervisorClient) SetFailsafeEnabled

func (sc *SupervisorClient) SetFailsafeEnabled(ctx context.Context, enabled bool) error

SetFailsafeEnabled calls admin_setFailsafeEnabled on the supervisor

type Updater

type Updater interface {
	Start(ctx context.Context) error
	Enqueue(job *Job)
	Stop() error
	CollectForMetrics(jobs map[JobID]*Job) map[JobID]*Job
}

Updaters are responsible for updating jobs from a chain for the metric collector to track

type UpdaterClient

type UpdaterClient interface {
	FetchReceiptsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Receipts, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL