Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var SensorCmd = &cobra.Command{ Use: "sensor [nodes file]", Short: "Start a devp2p sensor that discovers other peers and will receive blocks and transactions.", Long: "If no nodes.json file exists, it will be created.", Args: cobra.MinimumNArgs(1), PreRunE: func(cmd *cobra.Command, args []string) (err error) { inputSensorParams.NodesFile = args[0] inputSensorParams.nodes, err = p2p.ReadNodeSet(inputSensorParams.NodesFile) if err != nil { log.Warn().Err(err).Msgf("Creating nodes file %v because it does not exist", inputSensorParams.NodesFile) } if len(inputSensorParams.TrustedNodesFile) > 0 { inputSensorParams.trustedNodes, err = p2p.ReadNodeSet(inputSensorParams.TrustedNodesFile) if err != nil { log.Warn().Err(err).Msgf("Trusted nodes file %v not found", inputSensorParams.TrustedNodesFile) } } if len(inputSensorParams.Bootnodes) > 0 { inputSensorParams.bootnodes, err = p2p.ParseBootnodes(inputSensorParams.Bootnodes) if err != nil { return fmt.Errorf("unable to parse bootnodes: %w", err) } } if inputSensorParams.NetworkID == 0 { return errors.New("network ID must be greater than zero") } if inputSensorParams.ShouldRunPprof { go func() { if pprofErr := http.ListenAndServe(fmt.Sprintf("localhost:%v", inputSensorParams.PprofPort), nil); pprofErr != nil { log.Error().Err(pprofErr).Msg("Failed to start pprof") } }() } inputSensorParams.privateKey, err = crypto.GenerateKey() if err != nil { return err } if len(inputSensorParams.KeyFile) > 0 { var privateKey *ecdsa.PrivateKey privateKey, err = crypto.LoadECDSA(inputSensorParams.KeyFile) if err != nil { log.Warn().Err(err).Msg("Key file was not found, generating a new key file") err = crypto.SaveECDSA(inputSensorParams.KeyFile, inputSensorParams.privateKey) if err != nil { return err } } else { inputSensorParams.privateKey = privateKey } } inputSensorParams.genesis, err = loadGenesis(inputSensorParams.GenesisFile) if err != nil { log.Error().Err(err).Msg("Failed to load genesis file") return err } inputSensorParams.nat, err = nat.Parse(inputSensorParams.NAT) if err != nil { log.Error().Err(err).Msg("Failed to parse NAT") return err } return nil }, RunE: func(cmd *cobra.Command, args []string) error { db := database.NewDatastore(cmd.Context(), database.DatastoreOptions{ ProjectID: inputSensorParams.ProjectID, SensorID: inputSensorParams.SensorID, MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, }) block, err := getLatestBlock(inputSensorParams.RPC) if err != nil { return err } head := p2p.HeadBlock{ Hash: block.Hash.ToHash(), TotalDifficulty: block.TotalDifficulty.ToBigInt(), Number: block.Number.ToUint64(), } opts := p2p.Eth66ProtocolOptions{ Context: cmd.Context(), Database: db, Genesis: &inputSensorParams.genesis, GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), RPC: inputSensorParams.RPC, SensorID: inputSensorParams.SensorID, NetworkID: inputSensorParams.NetworkID, Peers: make(chan *enode.Node), Head: &head, HeadMutex: &sync.RWMutex{}, Count: &p2p.MessageCount{}, } config := ethp2p.Config{ PrivateKey: inputSensorParams.privateKey, BootstrapNodes: inputSensorParams.bootnodes, TrustedNodes: inputSensorParams.trustedNodes, MaxPeers: inputSensorParams.MaxPeers, ListenAddr: fmt.Sprintf(":%d", inputSensorParams.Port), DiscAddr: fmt.Sprintf(":%d", inputSensorParams.DiscoveryPort), Protocols: []ethp2p.Protocol{p2p.NewEth66Protocol(opts)}, DialRatio: inputSensorParams.DialRatio, NAT: inputSensorParams.nat, } if inputSensorParams.QuickStart { config.StaticNodes = inputSensorParams.nodes } server := ethp2p.Server{Config: config} log.Info().Str("enode", server.Self().URLv4()).Msg("Starting sensor") if err := server.Start(); err != nil { return err } defer server.Stop() ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) peers := make(p2p.NodeSet) for _, node := range inputSensorParams.nodes { peers[node.ID()] = node.URLv4() } for { select { case <-ticker.C: count := opts.Count.Load() opts.Count.Clear() log.Info().Interface("peers", server.PeerCount()).Interface("counts", count).Send() case peer := <-opts.Peers: if _, ok := peers[peer.ID()]; !ok { peers[peer.ID()] = peer.URLv4() if err := p2p.WriteNodeSet(inputSensorParams.NodesFile, peers); err != nil { log.Error().Err(err).Msg("Failed to write nodes to file") } } case <-signals: log.Info().Msg("Stopping sensor...") return nil } } }, }
SensorCmd represents the sensor command. This is responsible for starting a sensor and transmitting blocks and transactions to a database.
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.