crawler

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Error States
	ZeroState DialState = iota
	NegativeWithHopeState
	NegativeWithoutHopeState
	PossitiveState

	// Delays based on the the Error State
	ZeroDelay                = 0 * time.Minute
	NegativeWithHopeDelay    = 3 * time.Minute
	NegativeWithoutHopeDalay = 20 * time.Minute
	PossitiveDelay           = 10 * time.Minute
)
View Source
const (
	InitDelay = 2 * time.Second
)
View Source
const (
	MetricLoopInterval time.Duration = 15 * time.Second
)
View Source
const (
	Timeout = 15 * time.Second
)

Variables

View Source
var (
	// crawler host related metrics
	DefaultLogLevel             = "info"
	DefaultDBEndpoint           = "postgresql://user:password@localhost:5432/ragnodb"
	DefaultHostIP               = "0.0.0.0"
	DefaultHostPort             = 9050
	DefaultMetricsIP            = "localhost"
	DefaultMetricsPort          = 9070
	DefaultMetricsEndpoint      = "metrics"
	DefaultConcurrentDialers    = 150
	DefaultConcurrentPersisters = 2
	DefaultConnTimeout          = 30 * time.Second
	DefaultSnapshotInterval     = 30 * time.Minute
	DefaultIPAPIUrl             = "" /* 169-byte string literal not displayed */
	DefaultDeprecationTime      = 48 * time.Hour
)
View Source
var (
	ErrorNone    = "none"
	ErrorUnknown = "unknown"

	ErrorEOF                    = "eof"
	ErrorDisconnectRequested    = "disconnect_requested"
	ErrorDecodeRLPdisconnect    = "rlp_decode_disconnect"
	ErrorUselessPeer            = "useless_peer"
	ErrorBadHandshake           = "bad_handshake"
	ErrorBadHandshake2          = "bad_handshake_code_2"
	ErrorTimeout                = "time_out"
	ErrorSnappyCorruptedInput   = "snappy_input_corrupted"
	ErrorSubprotocol            = "subprotocol_error"
	ErrorConnectionRefused      = "connection_refused"
	ErrorConnectionReset        = "connection_reset_by_peer"
	ErrorTooManyPeers           = "too_many_peers"
	ErrorIOTimeout              = "i/o_timeout"
	ErrorNoRouteToHost          = "no_route_to_host"
	ErrorProtocolNegotiation    = "eth_protocols_negotiation"
	ErrorBadHandshakeDisconnect = "bad_handshake_disconnect"
)
View Source
var (

	// List of metrics that we are going to export
	ClientDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "observed_client_distribution",
		Help:      "Number of nodes using the clients seen",
	},
		[]string{"client"},
	)
	VersionDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "observed_client_version_distribution",
		Help:      "Number of nodes from each of the client's versions",
	},
		[]string{"client_version"},
	)
	GeoDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "geographical_distribution",
		Help:      "Number of nodes from each country",
	},
		[]string{"country"},
	)
	NodeDistribution = prometheus.NewGauge(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "node_distribution",
		Help:      "Total number of nodes",
	})
	DeprecatedCount = prometheus.NewGauge(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "deprecated_nodes",
		Help:      "Total number of deprecated nodes",
	})
	OsDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "os_distribution",
		Help:      "OS distribution of connected nodes",
	},
		[]string{"os"},
	)
	ArchDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "arch_distribution",
		Help:      "Architecture distribution of the active nodes in the network",
	},
		[]string{"arch"},
	)
	HostedPeers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "hosted_peers_distribution",
		Help:      "Distribution of nodes that are hosted on non-residential networks",
	},
		[]string{"ip_host"},
	)
	RttDist = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "observed_rtt_distribution",
		Help:      "Distribution of RTT between the crawler and nodes in the network",
	},
		[]string{"secs"},
	)
	IPDist = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: moduleName,
		Name:      "observed_ip_distribution",
		Help:      "Distribution of IPs hosting nodes in the network",
	},
		[]string{"numbernodes"},
	)
)
View Source
var KnownErrors = map[string]string{
	ErrorNone:                   "None",
	ErrorEOF:                    "EOF",
	ErrorDisconnectRequested:    "disconnect requested",
	ErrorDecodeRLPdisconnect:    "rlp: expected input list for ethtest.Disconnect",
	ErrorUselessPeer:            "useless peer",
	ErrorBadHandshake:           "bad handshake: ",
	ErrorBadHandshake2:          "bad status handshake code: 2",
	ErrorTimeout:                "connect: connection timed out",
	ErrorSnappyCorruptedInput:   "snappy: corrupt input",
	ErrorSubprotocol:            "subprotocol error",
	ErrorConnectionRefused:      "connect: connection refused",
	ErrorConnectionReset:        "connection reset by peer",
	ErrorTooManyPeers:           "too many peers",
	ErrorIOTimeout:              "i/o timeout",
	ErrorNoRouteToHost:          "connect: no route to host",
	ErrorProtocolNegotiation:    "eth protocols negotiation",
	ErrorBadHandshakeDisconnect: "bad status handshake disconnect: ",
}

Functions

func GetPublicIP

func GetPublicIP() (net.IP, error)

func ParseConnError

func ParseConnError(err error) string

func ParseLogLevel

func ParseLogLevel(level string) logrus.Level

Types

type Crawler

type Crawler struct {

	// IP locator
	IPLocator *apis.IPLocator
	// contains filtered or unexported fields
}

func NewCrawler

func NewCrawler(ctx context.Context, conf CrawlerRunConf) (*Crawler, error)

func (*Crawler) Close

func (c *Crawler) Close()

func (*Crawler) GetClientDistributionMetrics

func (crawler *Crawler) GetClientDistributionMetrics() *metrics.Metric

func (*Crawler) GetMetrics

func (crawler *Crawler) GetMetrics() *metrics.MetricsModule

func (*Crawler) Run

func (c *Crawler) Run() error

type CrawlerRunConf

type CrawlerRunConf struct {
	LogLevel         string        `yaml:"log-level"`
	DbEndpoint       string        `yaml:"db-endpoint"`
	HostIP           string        `yaml:"ip"`
	HostPort         int           `yaml:"port"`
	MetricsIP        string        `yaml:"metrics-ip"`
	MetricsPort      int           `yaml:"metrics-port"`
	MetricsEndpoint  string        `yaml:"metrics-endpoint"`
	Dialers          int           `yaml:"dialers"`
	Persisters       int           `yaml:"persisters"`
	ConnTimeout      time.Duration `yaml:"conn-timeout"`
	SnapshotInterval time.Duration `yaml:"snapshot-interval"`
	IPAPIUrl         string        `yaml:"ip-api-url"`
	DeprecationTime  time.Duration `yaml:"deprecation-time"`
}

func NewDefaultRun

func NewDefaultRun() *CrawlerRunConf

func (*CrawlerRunConf) Apply

func (c *CrawlerRunConf) Apply(ctx *cli.Context) error

Only considered the configuration for the Execution Layer's crawler -> RunCommand

type DialState

type DialState int8

func ParseStateFromError

func ParseStateFromError(err string) (state DialState)

func (DialState) DelayFromState

func (s DialState) DelayFromState() (delay time.Duration)

func (DialState) StateToString

func (s DialState) StateToString() (str string)

type Host

type Host struct {
	// contains filtered or unexported fields
}

func NewHost

func NewHost(ctx context.Context, ip string, port int, timeout time.Duration, opts ...HostOption) (*Host, error)

func (*Host) Close

func (h *Host) Close()

func (*Host) Connect

func (h *Host) Connect(remoteNode *models.HostInfo) (ethtest.HandshakeDetails, models.ChainDetails, error)

Connect attempts to connect a given node getting a list of details from each handshake

type HostOption

type HostOption func(*Host) error

func WithCustomCaps

func WithCustomCaps(caps []p2p.Cap) HostOption

set custom caps that are not the mainnet ones

func WithHighestProtoVersion

func WithHighestProtoVersion(version int) HostOption

select any custom highest protocol version

func WithPrivKey

func WithPrivKey(privk *ecdsa.PrivateKey) HostOption

overrides the the new key with a custom one (to have the same node_id)

type Llvl

type Llvl string
var (
	Trace  Llvl = "trace"
	Debug  Llvl = "debug"
	Info   Llvl = "info"
	Warn   Llvl = "warn"
	ErrorL Llvl = "error"
)

type NodeOrderedSet

type NodeOrderedSet struct {
	// contains filtered or unexported fields
}

List of Nodes ordered by time for next connection

func NewNodeOrderedSet

func NewNodeOrderedSet() *NodeOrderedSet

func (*NodeOrderedSet) AddNode

func (s *NodeOrderedSet) AddNode(hInfo models.HostInfo)

func (*NodeOrderedSet) GetNode

func (s *NodeOrderedSet) GetNode(nodeID enode.ID) (*QueuedNode, bool)

GetNode retrieves the info of the requested node

func (*NodeOrderedSet) IsEmpty

func (s *NodeOrderedSet) IsEmpty() bool

func (*NodeOrderedSet) IsPeerAlready

func (s *NodeOrderedSet) IsPeerAlready(nodeID enode.ID) bool

func (*NodeOrderedSet) IsThereNext

func (s *NodeOrderedSet) IsThereNext() bool

IsThereNext returns a boolean indicating whether there is a new item ready to be readed

func (*NodeOrderedSet) Len

func (s *NodeOrderedSet) Len() int

func (*NodeOrderedSet) Less

func (s *NodeOrderedSet) Less(i, j int) bool

Less is part of sort.Interface. We use c.PeerList.NextConnection as the value to sort by.

func (*NodeOrderedSet) NextNode

func (s *NodeOrderedSet) NextNode() QueuedNode

func (*NodeOrderedSet) OrderSet

func (s *NodeOrderedSet) OrderSet()

--- SORTING METHODS FOR PeerQueue ---- OrderSet sorts the items based on their next connection time

func (*NodeOrderedSet) RemoveNode

func (s *NodeOrderedSet) RemoveNode(nodeID enode.ID)

func (*NodeOrderedSet) Swap

func (s *NodeOrderedSet) Swap(i, j int)

Swap is part of sort.Interface.

func (*NodeOrderedSet) UpdateListFromSet

func (s *NodeOrderedSet) UpdateListFromSet(nSet []models.HostInfo)

func (*NodeOrderedSet) UpdateNodeFromConnAttempt

func (s *NodeOrderedSet) UpdateNodeFromConnAttempt(
	nodeID enode.ID, connAttempt *models.ConnectionAttempt, sameNetwork bool,
	deprecationTime time.Duration,
)

type Peering

type Peering struct {
	IPLocator *apis.IPLocator
	// contains filtered or unexported fields
}

func NewPeeringService

func NewPeeringService(
	ctx context.Context, h *Host, database *db.PostgresDBService, dialers int,
	deprecationTime time.Duration, IPLocator *apis.IPLocator,
) *Peering

func (*Peering) Close

func (p *Peering) Close()

func (*Peering) Connect

func (p *Peering) Connect(hInfo models.HostInfo)

Connect applies the logic of connecting the remote node and persist the necessary results from the attempt

func (*Peering) Run

func (p *Peering) Run() error

type QueuedNode

type QueuedNode struct {
	// contains filtered or unexported fields
}

Main structure of a node that is queued to be dialed

func NewQueuedNode

func NewQueuedNode(hInfo models.HostInfo) *QueuedNode

func (*QueuedNode) AddNegativeDial

func (n *QueuedNode) AddNegativeDial(baseT time.Time, deprecationTime time.Duration, state DialState)

func (*QueuedNode) AddPositiveDial

func (n *QueuedNode) AddPositiveDial(baseT time.Time)

func (*QueuedNode) IsDeprecable

func (n *QueuedNode) IsDeprecable() bool

func (*QueuedNode) IsEmpty

func (n *QueuedNode) IsEmpty() bool

func (*QueuedNode) NextDialTime

func (n *QueuedNode) NextDialTime() time.Time

func (*QueuedNode) ReadyToDial

func (n *QueuedNode) ReadyToDial() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL