Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var SensorCmd = &cobra.Command{ Use: "sensor [nodes file]", Short: "Start a devp2p sensor that discovers other peers and will receive blocks and transactions.", Long: "If no nodes.json file exists, it will be created.", Args: cobra.MinimumNArgs(1), PreRunE: func(cmd *cobra.Command, args []string) (err error) { inputSensorParams.NodesFile = args[0] _, err = p2p.ReadNodeSet(inputSensorParams.NodesFile) if err != nil { log.Warn().Err(err).Msgf("Creating nodes file %v because it does not exist", inputSensorParams.NodesFile) } if len(inputSensorParams.StaticNodesFile) > 0 { inputSensorParams.staticNodes, err = p2p.ReadNodeSet(inputSensorParams.StaticNodesFile) if err != nil { log.Warn().Err(err).Msgf("Static nodes file %v not found", inputSensorParams.StaticNodesFile) } } if len(inputSensorParams.TrustedNodesFile) > 0 { inputSensorParams.trustedNodes, err = p2p.ReadNodeSet(inputSensorParams.TrustedNodesFile) if err != nil { log.Warn().Err(err).Msgf("Trusted nodes file %v not found", inputSensorParams.TrustedNodesFile) } } if len(inputSensorParams.Bootnodes) > 0 { inputSensorParams.bootnodes, err = p2p.ParseBootnodes(inputSensorParams.Bootnodes) if err != nil { return fmt.Errorf("unable to parse bootnodes: %w", err) } } if inputSensorParams.NetworkID == 0 { return errors.New("network ID must be greater than zero") } inputSensorParams.privateKey, err = crypto.GenerateKey() if err != nil { return err } if len(inputSensorParams.KeyFile) > 0 { var privateKey *ecdsa.PrivateKey privateKey, err = crypto.LoadECDSA(inputSensorParams.KeyFile) if err != nil { log.Warn().Err(err).Msg("Key file was not found, generating a new key file") err = crypto.SaveECDSA(inputSensorParams.KeyFile, inputSensorParams.privateKey) if err != nil { return err } } else { inputSensorParams.privateKey = privateKey } } if len(inputSensorParams.PrivateKey) > 0 { inputSensorParams.privateKey, err = crypto.HexToECDSA(inputSensorParams.PrivateKey) if err != nil { log.Error().Err(err).Msg("Failed to parse PrivateKey") return err } } inputSensorParams.nat, err = nat.Parse(inputSensorParams.NAT) if err != nil { log.Error().Err(err).Msg("Failed to parse NAT") return err } return nil }, RunE: func(cmd *cobra.Command, args []string) error { db, err := newDatabase(cmd.Context()) if err != nil { return err } block, err := getLatestBlock(inputSensorParams.RPC) if err != nil { return err } head := p2p.HeadBlock{ Hash: block.Hash.ToHash(), TotalDifficulty: block.TotalDifficulty.ToBigInt(), Number: block.Number.ToUint64(), } peersGauge := promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "sensor", Name: "peers", Help: "The number of peers the sensor is connected to", }) msgCounter := promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "sensor", Name: "messages", Help: "The number and type of messages the sensor has sent and received", }, []string{"message", "url", "name", "direction"}) conns := p2p.NewConns(p2p.ConnsOptions{ MaxBlocks: inputSensorParams.MaxBlocks, BlocksCacheTTL: inputSensorParams.BlocksCacheTTL, }) opts := p2p.EthProtocolOptions{ Context: cmd.Context(), Database: db, GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), RPC: inputSensorParams.RPC, SensorID: inputSensorParams.SensorID, NetworkID: inputSensorParams.NetworkID, Conns: conns, Head: &head, HeadMutex: &sync.RWMutex{}, ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, MsgCounter: msgCounter, MaxRequests: inputSensorParams.MaxRequests, RequestsCacheTTL: inputSensorParams.RequestsCacheTTL, } config := ethp2p.Config{ PrivateKey: inputSensorParams.privateKey, BootstrapNodes: inputSensorParams.bootnodes, StaticNodes: inputSensorParams.staticNodes, TrustedNodes: inputSensorParams.trustedNodes, MaxPeers: inputSensorParams.MaxPeers, ListenAddr: fmt.Sprintf(":%d", inputSensorParams.Port), DiscAddr: fmt.Sprintf(":%d", inputSensorParams.DiscoveryPort), DialRatio: inputSensorParams.DialRatio, NAT: inputSensorParams.nat, DiscoveryV4: !inputSensorParams.NoDiscovery, DiscoveryV5: !inputSensorParams.NoDiscovery, Protocols: []ethp2p.Protocol{ p2p.NewEthProtocol(66, opts), p2p.NewEthProtocol(67, opts), p2p.NewEthProtocol(68, opts), }, } server := ethp2p.Server{Config: config} log.Info().Str("enode", server.Self().URLv4()).Msg("Starting sensor") if err = server.Start(); err != nil { return err } defer server.Stop() events := make(chan *ethp2p.PeerEvent) sub := server.SubscribeEvents(events) defer sub.Unsubscribe() ticker := time.NewTicker(2 * time.Second) ticker1h := time.NewTicker(time.Hour) defer ticker.Stop() defer ticker1h.Stop() dnsLock := make(chan struct{}, 1) signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) if inputSensorParams.ShouldRunPprof { go handlePprof() } if inputSensorParams.ShouldRunPrometheus { go handlePrometheus() } go handleAPI(&server, msgCounter, conns) go handleRPC(conns, inputSensorParams.NetworkID) go handleDNSDiscovery(&server, dnsLock) for { select { case <-ticker.C: peersGauge.Set(float64(server.PeerCount())) db.WritePeers(cmd.Context(), server.Peers(), time.Now()) urls := []string{} for _, peer := range server.Peers() { urls = append(urls, peer.Node().URLv4()) } if err := removePeerMessages(msgCounter, urls); err != nil { log.Error().Err(err).Msg("Failed to clean up peer messages") } if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil { log.Error().Err(err).Msg("Failed to write nodes to file") } case <-ticker1h.C: go handleDNSDiscovery(&server, dnsLock) case <-signals: log.Info().Msg("Stopping sensor...") return nil case event := <-events: log.Debug().Any("event", event).Send() case err := <-sub.Err(): log.Error().Err(err).Send() } } }, }
SensorCmd represents the sensor command. This is responsible for starting a sensor and transmitting blocks and transactions to a database.
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.