multiplex

package
v1.0.0-mx.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: Apache-2.0 Imports: 57 Imported by: 0

README

CometBFT Multiplex

The multiplex package provides with an implementation of CometBFT that allows for running concurrent consensus instances, on many different chains in parallel.

Implementation

A ChainRegistry interface is used for the initial configuration of seed nodes, for connecting to existing chains, and for custom state-sync, which is currently disabled. Importantly, when multiplex is enabled, we expect the genesis.json file to contain a GenesisDocSet JSON.

The snapsapp package implements a multi-network ABCI application that enables consensus events mapping for the client implementation with extension interfaces for several stages of a consensus instance:

  • CheckTx: Define auditing/reporting units for transactions ;
  • PrepareProposal: Define pre-processing units for transactions data ;
  • ProcessProposal: Define post-processing units for transactions data ;
  • FinalizeBlock: Define processing units for blocks data ;
  • Commit: Define auditing/reporting units for committed blocks.

The client package implements a default client integration for multiplex features that may be used to inject custom configuration and to mutate or otherwise use state machines as they are replicated on the networks.

Source code conventions

We define some simple conventions in the multiplex package that must be followed to improve readability of the source code and to provide a more standardized implementation.

  • Tests are colocated in a go package multiplex_test which imports multiplex.
  • Tests are written in files with a suffix of _test.go, e.g. chain_id_test.go.
  • Naming convention ChainAbc for structures with a ChainID, e.g. ChainDB.
  • Naming convention MultiplexAbc for ChainID mappings, e.g. MultiplexDB.`
  • Naming convention NewMultiplexAbc() must return (MultiplexAbc, error).
  • Naming convention NewChainAbc() must return (ChainAbc, error).
  • Naming convention for imports with mx prefix for multiplex features.
  • Naming convention for imports with cmt prefix for cometbft features.

Configuration

A config.MultiplexConfig structure defines the multiplex state replication configuration for a CometBFT node connecting to one or many networks.

The MultiplexConfig#UserChains option is necessary for creating new replicated chains, while MultiplexConfig#ChainSeeds is necessary for connecting to existing ones.

Individual fields documentation can be found in config.MultiplexConfig.

Options helpers:
  • WithStrategy
  • WithSyncConfig
  • WithChainSeeds
  • WithUserChains
  • WithP2PStartPort
  • WithRPCStartPort
NewConfigOverwrite

To begin with, NewConfigOverwrite updates a node configuration in-place to overwrite the services listen addresses such that there is one P2P- and one RPC port per replicated chain. Following ports overwrite apply:

  • P2P: legacy 26656, multiplex 30001...3000x with x the index of nodes
  • RPC: legacy 26657, multiplex 40001...4000x with x the index of nodes

The index of a node generally represents the index of the ChainID as per the ChainRegistry#GetChains return value, e.g. given ChainIDs ['A','B','C'], the index of a node for the chain 'B', is 1 and the index for the chain 'A', is 0.

This method also overwrites the P2P.Seeds configuration option such that each replicated chain uses its own seed nodes, and the WAL file is changed so that each replicated chain writes to a separate WAL-file.

Also, state-sync is forcefully disabled because the preferred method of synchronization with individual replicated chains is to use block-sync.

ReplicationStrategy

The ReplicationStrategy exports a string interface that determines the type of replication being executed on this node. This strategy is notably used to determine the type of node and if it should enable multiplex features.

We currently support two replication strategies:

  • "Network": The instance shall synchronize with replicated chains.
  • "Disable": The instance shall run as a legacy node, without multiplex.

The replication strategy of a node shall determine whether the node does synchronize with replicated chains or not. Nodes that are not configured to synchronize with replicated chains may only be used to synchronize with legacy cometbft blockchain networks which are not compatible with nodes multiplexes.

GenesisDocSet

The GenesisDocSet consists of a slice of types.GenesisDoc objects which define the initial conditions for a CometBFT node multiplex, in particular their validator set, consensus parameters and ChainID.

We added an interface node.IChecksummedGenesisDoc based on the legacy interface to enable compatibility with legacy nodes that use only a singular types.GenesisDoc instance to connect to only one network without multiplex.

Importantly, when multiplex is enabled, we expect the genesis.json file to contain a GenesisDocSet JSON with one or many replicated chains.

Client interface

Extensions

We define the rules for configuration extensions, consensus extensions and also for data extensions, which may be used to overwrite configuration objects and to process- or mutate data using a custom client business logic, e.g. which involves calls to remote servers, or which stores data in a separate database, etc.

Interfaces
  • SyncConfigExtensionFn: Provides custom state-sync configuration values.
  • SeedConfigExtensionFn: Provides custom seed nodes configuration values.
  • ValidatorUpdateExtensionFn: Provides custom auditing/reporting units for validator updates.
  • ConsensusUpdateExtensionFn: Provides custom auditing/reporting units for consensus parameter updates.
  • CheckTxExtensionFn: Provides custom auditing/reporting units for transactions.
  • PrepareProposalExtensionFn: Provides custom pre-processing units for transactions data.
  • ProcessProposalExtensionFn: Provides custom post-processing units for transactions data.
  • FinalizeBlockExtensionFn: Provides custom processing units for blocks data.
  • CommitExtensionFn: Provides custom auditing/reporting units for committed blocks.
  • CheckMutationResultExtensionFn: Provides custom auditing units for state mutation results.

We provide several example implementations that basically just deep-copy the input. Obviously, if you are developing a custom extension, you would do more than just deep-copy input objects.

An example for SyncConfigExtensionFn is: DefaultSyncConfigExtension An example for SeedConfigExtensionFn is: DefaultSeedConfigExtension An example for ValidatorUpdateExtensionFn is: DefaultValidatorUpdateExtension An example for ConsensusUpdateExtensionFn is: DefaultConsensusUpdateExtension An example for CheckTxExtensionFn is: DefaultCheckTxExtension An example for PrepareProposalExtensionFn is: DefaultPrepareProposalExtension An example for ProcessProposalExtensionFn is: DefaultProcessProposalExtension An example for FinalizeBlockExtensionFn is: DefaultFinalizeBlockExtension An example for CommitExtensionFn is: DefaultCommitExtension An example for CheckMutationResultExtensionFn is: DefaultCheckMutationResultExtension

ChainRegistry

A ChainRegistry interface is used for the initial configuration of seed nodes, for connecting to existing chains.

This structure defines a registry pattern contract which should be searchable by ChainID and by user address.

Note that the ChainID slice is ordered in ascending alphabetical order.

IMPORTANT: This structure requires the ChainID field to contain a user address of 20 bytes in hexadecimal format and a fingerprint of 8 bytes. e.g.: mx-chain-FF080888BE0F48DE88927C3F49215B96548273AB-3E547E3280313019

The ChainRegistry interface defines a contract for the methods:

  • ChainRegistry#HasChain: True when the ChainID is known by the peer.
  • ChainRegistry#GetChains: Returns an ordered slice of ChainID values.
  • ChainRegistry#GetSeeds: Returns a comma-separated list of seed nodes.
  • ChainRegistry#GetStateSyncConfig: Returns the custom state-sync config.
  • ChainRegistry#GetAddress: Finds a user address for a ChainID.
  • ChainRegistry#FindChain: Searches for a ChainID in the registry.

Note that we provide an internal implementation of the ChainRegistry interface with singletonChainRegistry which is the implementation used under-the-hood by NewChainRegistry.

Reactor

The Reactor implementation takes care of configuring node instances for the correct replicated blockchain networks. The reactor starts multiple listeners in parallel and sends messages on a channel to report about successful launch.

When a set of node listeners is ready, the multiplex reactor sends a message on its channel chainReadyCh which contains a ChainID of the chain that is being replicated. After this happened, the node is able to start syncing state and/or blocks, as well as starting indexers, mempool, and other services.

This structure is responsible for handling incoming messages on one or more Channel instances whereby the following contract applies:

  • The OnStart() must be called to setup replicated chain node listeners.
  • The p2p.Switch calls GetChannels() when a new reactor is added to it.
  • When a new peer joins our node, InitPeer() and AddPeer() are called.
  • When a peer is stopped, obviously, RemovePeer() is called.
  • When receiving messages on channels of a reactor, Receive() is called.

proxy.ChainConns

The proxy.ChainConns is a breaking upgrade to proxy.AppConns which passes a ChainID to connection methods such that the right connections are used for the different replicated chains.

Note that only one shared ABCI client is used by all replicated chains. On the other hand, we create x connections with the client, one per replicated chain.

Return types of methods defined by this interface are compatible with proxy.AppConns to prevent breaking the ABCI integration.

SnapsApp

SnapsApp defines a multi-network ABCI application that enables consensus events mapping for the client implementation.

Read-write mutexes are created to track initial heights on concurrent threads, as well as for the currently working height in the process of finalizing and committing blocks. Extensions can be implemented to hook into the processes of creating blocks proposal, processing them, and/or to audit the data added with a committed block.

Note that only one instance of the SnapsApp application must be created for node multiplexes.

Protobuf

The multiplex package enables Protobuf messages for different purposes, e.g. for transporting Snapshots metadata or Nodes information.

We provide an extension to the api/ package using Protobuf .proto files in a custom folder multiplex/proto/. The package name cometbft.multiplex.v1 is used to extend the currently available CometBFT API Protobuf generation.

MultiNetworkNodeInfo

This Protobuf definition consists in defining a p2p.NodeInfo implementation that is compatible with node multiplex which are connected to multiple replicated chains.

Notable methods implementation include, but are not limited to:

  • GetChains(): Get the list of ChainID from replicated chains of a node.
  • GetListenAddrs(): Get the P2P listen addresses per replicated chains.
  • GetRPCAddresses(): Get the RPC listen addresses per replicated chains.

Note that the MultiNetworkNodeInfo Protobuf message is compatible with the legacy cometbft.p2p.v1.NodeInfo.

Generating from Protobuf definition
protoc -I=$GOPATH/src \
		-I=$GOPATH/pkg/mod/github.com/cosmos/gogoproto\@v1.6.0/
		-I=proto/ \
		-I=multiplex/proto/ \
		--gogofaster_out=api/ \
		multiplex/proto/cometbft/multiplex/v1/types.proto

mv api/github.com/ice-blockchain/cometbft/* api/cometbft/
rm -rf api/github.com

Testing

Multiple unit test suites are provided with the multiplex package. You can run one of these full unit test suites with the following commands:

# running the full unit test suites
go test github.com/ice-blockchain/cometbft/multiplex -test.v
go test github.com/ice-blockchain/cometbft/multiplex/client -test.v
go test github.com/ice-blockchain/cometbft/multiplex/snapsapp -test.v

Alternatively, you can also run individual unit tests or unit test suites using one of the following commands:

# running individual unit test suites
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexGenesis.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexDB.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexFS.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexExtendedChainID.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexChainState.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexReactor.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexP2P.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex/client -run TestMultiplexClient.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex/snapsapp -run TestABCI.* -test.v

Linter

# running the pre-commit step(s)
`make lint`

The preferred linter is golangci-lint as included in the pre-commit steps which can also be run manually with the following command:

# running the linter manually
golangci-lint run -c .golangci.yml --fix --disable revive,iface,recvcheck

Runtime

A more comprehensive node setup guide should be provided in a separate document. This section merely lists the commands that have been modified or added as part of this implementation.

# configuring a nodes multiplex (requires users.json)
go run ./cmd/cometbft/main.go init --home /tmp/cometbftmx --multiplex

# starting the nodes multiplex (requires genesis.json)
go run ./cmd/cometbft/main.go multiplex --home /tmp/cometbftmx

References

This implementation is based on CometBFT v1.x branch, which is still under active development. Therefore, it is utterly important to keep track of updates committed to the upstream branch as listed here: cometbft-v1x.

Other resources

Documentation

Overview

The multiplex package provides with an implementation of CometBFT that allows for running concurrent consensus instances, on many different chains in parallel.

Implementation

A ChainRegistry interface is used for the initial configuration of seed nodes, for connecting to existing chains, and for custom state-sync, which is currently disabled. Importantly, when multiplex is enabled, we expect the `genesis.json` file to contain a GenesisDocSet JSON.

The `snapsapp` package implements a multi-network ABCI application that enables consensus events mapping for the client implementation with extension interfaces for several stages of a consensus instance:

- `CheckTx`: Define auditing/reporting units for transactions ; - `PrepareProposal`: Define pre-processing units for transactions data ; - `ProcessProposal`: Define post-processing units for transactions data ; - `FinalizeBlock`: Define processing units for blocks data ; - `Commit`: Define auditing/reporting units for committed blocks.

The `client` package implements a *default* client integration for multiplex features that may be used to *inject custom configuration* and to mutate or otherwise use state machines as they are replicated on the networks.

## Source code conventions

We define some simple conventions in the `multiplex` package that must be followed to improve readability of the source code and to provide a more standardized implementation.

- Tests are colocated in a go package `multiplex_test` which imports `multiplex`. - Tests are written in files with a suffix of `_test.go`, e.g. `chain_id_test.go`. - Naming convention `ChainAbc` for structures with a ChainID, e.g. `ChainDB`. - Naming convention `MultiplexAbc` for ChainID mappings, e.g. `MultiplexDB`.` - Naming convention `NewMultiplexAbc()` must return `(MultiplexAbc, error)`. - Naming convention `NewChainAbc()` must return `(ChainAbc, error)`. - Naming convention for imports with `mx` prefix for multiplex features. - Naming convention for imports with `cmt` prefix for cometbft features.

Configuration

A `config.MultiplexConfig` structure defines the multiplex state replication configuration for a CometBFT node connecting to one or many networks.

The `MultiplexConfig#UserChains` option is necessary for creating new replicated chains, while `MultiplexConfig#ChainSeeds` is necessary for connecting to existing ones.

Individual fields documentation can be found in config.MultiplexConfig.

## Options helpers:

- WithStrategy - WithSyncConfig - WithChainSeeds - WithUserChains - WithP2PStartPort - WithRPCStartPort

## NewConfigOverwrite

To begin with, NewConfigOverwrite updates a node configuration in-place to overwrite the services listen addresses such that there is one P2P- and one RPC port per replicated chain. Following ports overwrite apply:

- P2P: legacy `26656`, multiplex `30001`...`3000x` with x the index of nodes - RPC: legacy `26657`, multiplex `40001`...`4000x` with x the index of nodes

The *index of a node* generally represents the index of the ChainID as per the [ChainRegistry#GetChains] return value, e.g. given ChainIDs ['A','B','C'], the index of a node for the chain 'B', is 1 and the index for the chain 'A', is 0.

This method also overwrites the `P2P.Seeds` configuration option such that each replicated chain *uses its own seed nodes*, and the `WAL` file is changed so that each replicated chain *writes to a separate WAL-file*.

Also, state-sync is forcefully **disabled** because the preferred method of synchronization with individual replicated chains is to use **block-sync**.

## ReplicationStrategy

The [ReplicationStrategy] exports a string interface that determines the type of replication being executed on this node. This strategy is notably used to determine the type of node and if it should enable multiplex features.

We currently support two replication strategies:

- `"Network"`: The instance shall synchronize with replicated chains. - `"Disable"`: The instance shall run as a legacy node, without multiplex.

The replication strategy of a node shall determine whether the node does synchronize with replicated chains or not. Nodes that are not configured to synchronize with replicated chains may only be used to synchronize with legacy cometbft blockchain networks which are not compatible with nodes multiplexes.

## GenesisDocSet

The GenesisDocSet consists of a slice of `types.GenesisDoc` objects which define the initial conditions for a CometBFT node multiplex, in particular their validator set, consensus parameters and ChainID.

We added an interface `node.IChecksummedGenesisDoc` based on the legacy interface to enable compatibility with legacy nodes that use only a singular `types.GenesisDoc` instance to connect to only one network without multiplex.

Importantly, when multiplex is enabled, we expect the `genesis.json` file to contain a GenesisDocSet JSON with one or many replicated chains.

Client interface

## Extensions

We define the rules for *configuration extensions*, *consensus extensions* and also for *data extensions*, which may be used to overwrite configuration objects and to process- or mutate data using a *custom client business logic*, e.g. which involves calls to remote servers, or which stores data in a separate database, etc.

### Interfaces

  • [SyncConfigExtensionFn]: Provides custom state-sync configuration values.
  • [SeedConfigExtensionFn]: Provides custom seed nodes configuration values.
  • [ValidatorUpdateExtensionFn]: Provides custom auditing/reporting units for validator updates.
  • [ConsensusUpdateExtensionFn]: Provides custom auditing/reporting units for consensus parameter updates.
  • [CheckTxExtensionFn]: Provides custom auditing/reporting units for transactions.
  • [PrepareProposalExtensionFn]: Provides custom pre-processing units for transactions data.
  • [ProcessProposalExtensionFn]: Provides custom post-processing units for transactions data.
  • [FinalizeBlockExtensionFn]: Provides custom processing units for blocks data.
  • [CommitExtensionFn]: Provides custom auditing/reporting units for committed blocks.
  • [CheckMutationResultExtensionFn]: Provides custom auditing units for state mutation results.

We provide several example implementations that basically just *deep-copy* the input. Obviously, if you are developing a custom extension, you would do more than just deep-copy input objects.

An example for [SyncConfigExtensionFn] is: [DefaultSyncConfigExtension] An example for [SeedConfigExtensionFn] is: [DefaultSeedConfigExtension] An example for [ValidatorUpdateExtensionFn] is: [DefaultValidatorUpdateExtension] An example for [ConsensusUpdateExtensionFn] is: [DefaultConsensusUpdateExtension] An example for [CheckTxExtensionFn] is: [DefaultCheckTxExtension] An example for [PrepareProposalExtensionFn] is: [DefaultPrepareProposalExtension] An example for [ProcessProposalExtensionFn] is: [DefaultProcessProposalExtension] An example for [FinalizeBlockExtensionFn] is: [DefaultFinalizeBlockExtension] An example for [CommitExtensionFn] is: [DefaultCommitExtension] An example for [CheckMutationResultExtensionFn] is: [DefaultCheckMutationResultExtension]

## ChainRegistry

A ChainRegistry interface is used for the initial configuration of seed nodes, for connecting to existing chains.

This structure defines a registry pattern contract which should be searchable by ChainID and by user address.

Note that the **ChainID slice is ordered in ascending alphabetical order**.

IMPORTANT: This structure requires the ChainID field to contain a user address of 20 bytes in hexadecimal format and a fingerprint of 8 bytes. e.g.: `mx-chain-FF080888BE0F48DE88927C3F49215B96548273AB-3E547E3280313019`

The ChainRegistry interface defines a contract for the methods:

- [ChainRegistry#HasChain]: True when the ChainID is known by the peer. - [ChainRegistry#GetChains]: Returns an *ordered slice* of ChainID values. - [ChainRegistry#GetSeeds]: Returns a comma-separated list of seed nodes. - [ChainRegistry#GetStateSyncConfig]: Returns the custom state-sync config. - [ChainRegistry#GetAddress]: Finds a user address for a ChainID. - [ChainRegistry#FindChain]: Searches for a ChainID in the registry.

Note that we provide an internal implementation of the ChainRegistry interface with `singletonChainRegistry` which is the implementation used under-the-hood by NewChainRegistry.

## Reactor

The Reactor implementation takes care of configuring node instances for the correct replicated blockchain networks. The reactor starts multiple listeners in parallel and sends messages on a channel to report about successful launch.

When a set of node listeners is ready, the multiplex reactor sends a message on its channel `chainReadyCh` which contains a ChainID of the chain that is being replicated. After this happened, the node is able to start syncing state and/or blocks, as well as starting indexers, mempool, and other services.

This structure is responsible for handling incoming messages on one or more `Channel` instances whereby the following contract applies:

- The `OnStart()` must be called to setup replicated chain node listeners. - The `p2p.Switch` calls `GetChannels()` when a new reactor is added to it. - When a new peer joins our node, `InitPeer()` and `AddPeer()` are called. - When a peer is stopped, obviously, `RemovePeer()` is called. - When receiving messages on channels of a reactor, `Receive()` is called.

## proxy.ChainConns

The `proxy.ChainConns` is a breaking upgrade to `proxy.AppConns` which passes a ChainID to connection methods such that the right connections are used for the different replicated chains.

Note that only one shared ABCI client is used by all replicated chains. On the other hand, we create x connections with the client, one per replicated chain.

Return types of methods defined by this interface are compatible with `proxy.AppConns` to prevent breaking the ABCI integration.

SnapsApp

SnapsApp defines a multi-network ABCI application that enables consensus events mapping for the client implementation.

This application creates snapshots of full state machines, without filtering any of the included properties: ChainID, ConsensusParams, Validators, etc.

Read-write mutexes are created to track initial heights on concurrent threads, as well as for the currently working height in the process of finalizing and committing blocks. Extensions can be implemented to hook into the processes of creating blocks proposal, processing them, and/or to audit the data added with a committed block.

Note that *only one instance* of the SnapsApp application must be created for node multiplexes.

Protobuf

The `multiplex` package enables Protobuf messages for different purposes, e.g. for transporting Snapshots metadata or Nodes information.

We provide an *extension* to the `api/` package using Protobuf `.proto` files in a custom folder `multiplex/proto/`. The package name `cometbft.multiplex.v1` is used to extend the currently available CometBFT API Protobuf generation.

## MultiNetworkNodeInfo

This Protobuf definition consists in defining a `p2p.NodeInfo` implementation that is compatible with node multiplex which are connected to multiple replicated chains.

Notable methods implementation include, but are not limited to:

- `GetChains()`: Get the list of ChainID from replicated chains of a node. - `GetListenAddrs()`: Get the P2P listen addresses per replicated chains. - `GetRPCAddresses()`: Get the RPC listen addresses per replicated chains.

Note that the `MultiNetworkNodeInfo` Protobuf message is *compatible* with the legacy `cometbft.p2p.v1.NodeInfo`.

### Generating from Protobuf definition

protoc -I=$GOPATH/src \
		-I=$GOPATH/pkg/mod/github.com/cosmos/gogoproto\@v1.6.0/
		-I=proto/ \
		-I=multiplex/proto/ \
		--gogofaster_out=api/ \
		multiplex/proto/cometbft/multiplex/v1/types.proto

mv api/github.com/ice-blockchain/cometbft/* api/cometbft/
rm -rf api/github.com

Testing

Multiple unit test suites are provided with the `multiplex` package. You can run one of these full unit test suites with the following commands:

# running the full unit test suites
go test github.com/ice-blockchain/cometbft/multiplex -test.v
go test github.com/ice-blockchain/cometbft/multiplex/client -test.v
go test github.com/ice-blockchain/cometbft/multiplex/snapsapp -test.v

Alternatively, you can also run individual unit tests or unit test suites using one of the following commands:

# running individual unit test suites
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexGenesis.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexDB.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexFS.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexExtendedChainID.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexChainState.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexReactor.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex -run TestMultiplexP2P.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex/client -run TestMultiplexClient.* -test.v
go test github.com/ice-blockchain/cometbft/multiplex/snapsapp -run TestABCI.* -test.v

Linter

# running the pre-commit step(s)
`make lint`

The preferred linter is `golangci-lint` as included in the `pre-commit` steps which can also be run manually with the following command:

# running the linter manually
golangci-lint run -c .golangci.yml --fix --disable revive,iface,recvcheck

Runtime

A more comprehensive *node setup guide* should be provided in a separate document. This section merely lists the *commands* that have been modified or added as part of this implementation.

# configuring a nodes multiplex (requires users.json)
go run ./cmd/cometbft/main.go init --home /tmp/cometbftmx --multiplex

# starting the nodes multiplex (requires genesis.json)
go run ./cmd/cometbft/main.go multiplex --home /tmp/cometbftmx

References

This implementation is based on CometBFT `v1.x` branch, which is still under active development. Therefore, it is utterly important to keep track of updates committed to the upstream branch as listed here: cometbft-v1x.

## Links

- Source code for `multiplex`: multiplex - Source code for `snapsapp`: snapsapp - Source code for `client`: client - Technical definition: multiplex-notion

## Other resources

- CometBFT v1.x Release Branch: CometBFT - CometBFT v1.x Commits Log: cometbft-v1x

Index

Constants

View Source
const (
	// ReplicationChannel is used to send replicated chain updates.
	ReplicationChannel = byte(0x90)

	// Instance types.
	InstanceKeyConfig           = "config"
	InstanceKeyStorage          = "storage"
	InstanceKeyState            = "state"
	InstanceKeyStateStore       = "stateStore"
	InstanceKeyBlockStore       = "blockStore"
	InstanceKeyPrivValidator    = "privValidator"
	InstanceKeyDatabaseBlock    = "database/blockStore"
	InstanceKeyDatabaseState    = "database/state"
	InstanceKeyDatabaseIndex    = "database/txIndex"
	InstanceKeyDatabaseEvidence = "database/evidence"
	InstanceKeyP2PSwitch        = "p2p/switch"
	InstanceKeyP2PTransport     = "p2p/transport"
	InstanceKeyFlagBlockSync    = "flag/blockSync"

	// Services types.
	ServiceKeyEventBus         = "eventBus"
	ServiceKeyIndexers         = "indexers"
	ServiceKeyPruner           = "pruner"
	ServiceKeyMempoolReactor   = "reactor/mempool"
	ServiceKeyBlockSyncReactor = "reactor/blockSync"
	ServiceKeyConsensusReactor = "reactor/consensus"
	ServiceKeyEvidenceReactor  = "reactor/evidence"
	ServiceKeyNodeRuntime      = "runtime/node"
)

Variables

DefaultProtocolVersion populates the Block and P2P versions using the global values, but not the App.

Functions

func ChecksumGenesisDoc

func ChecksumGenesisDoc(genesisDoc *types.GenesisDoc) ([]byte, error)

ChecksumGenesisDoc creates a SHA256 checksum of a types.GenesisDoc.

func DisableReplicationStrategy

func DisableReplicationStrategy() config.ReplicationStrategy

DisableReplicationStrategy() returns the legacy node type which uses a mode, i.e. it disables multiplex features and uses the legacy node implementation.

func EnsureNetworkFS

func EnsureNetworkFS(
	chainID ExtendedChainID,
	baseConfDir string,
	baseDataDir string,
) (string, string, error)

EnsureNetworkFS creates a filesystem structure for a single network with a user folder located in data/ and config/, and which contains one subfolder per network, using the ChainID.

Return order is: config folder, data folder, error.

func GenesisDocFromChainParams

func GenesisDocFromChainParams(params *mxp2p.ChainParams) (types.GenesisDoc, error)

GenesisDocFromChainParams creates a genesisDoc from transported ChainParams which should contain all the genesisDoc properties.

func GenesisDocToChainParams

func GenesisDocToChainParams(genesisDoc types.GenesisDoc) (*mxp2p.ChainParams, error)

GenesisDocToChainParams creates a ChainParams object from a GenesisDoc.

func GetMultiplexPrefix

func GetMultiplexPrefix() string

GetMultitplexPrefix returns the value for constant multiplexPrefix.

func HistoryReplicationStrategy

func HistoryReplicationStrategy() config.ReplicationStrategy

HistoryReplicationStrategy() returns the historical node type which uses a mode of "History", i.e. it does not synchronize with replicated chains.

func LoadChainsFromGenesisFile

func LoadChainsFromGenesisFile(genFile string) (map[string][]string, error)

LoadChainsFromGenesisFile reads a genesis.json configuration file and populates a mapping of ChainID values by user address.

This method may be used to fill [ChainRegistry.userChains].

CAUTION: Using this method, it is assumed that the `genesis.json` file contains a *set of genesis docs* as defined with GenesisDocSet.

IMPORTANT: This method requires the ChainID field to contain a user address of 20 bytes in hexadecimal format and an arbitrary fingerprint of 8 bytes. e.g.: `mx-chain-FF080888BE0F48DE88927C3F49215B96548273AB-3E547E3280313019`.

func LoadSeedsFromFile

func LoadSeedsFromFile(file string) (map[string]string, error)

LoadSeedsFromFile read a seeds.json configuration file and populates a mapping of seed nodes by unique ChainID.

This method may be used to parse the content of a `seeds.json` configuration file as required to configure the state-sync partners of a replicated chain.

The list of seed nodes is comma-separated and uses the format: id@host:port.

func MultiplexGenesisDocProviderFunc

func MultiplexGenesisDocProviderFunc(nodeCfg *config.Config) node.GenesisDocProvider

MultiplexGenesisDocProviderFunc returns a node.GenesisDocProvider that loads the GenesisDocSet from a config.GenesisFile() on the filesystem.

CAUTION: this method expects the genesis file to contain a GenesisDocSet.

func MultiplexTransportHandshake

func MultiplexTransportHandshake(
	c net.Conn,
	timeout time.Duration,
	nodeInfo p2p.NodeInfo,
) (p2p.NodeInfo, error)

MultiplexTransportHandshake implements p2p.TransportHandshakeFn to permit using node multiplexes to exchange mxp2p.MultiNetworkNodeInfo messages between peers and execute a handshake between nodeInfo and the returned p2p.NodeInfo instance.

Mimics the same behavior as the default implementation in p2p.MultiplexTransport.

func NetworkReplicationStrategy

func NetworkReplicationStrategy() config.ReplicationStrategy

NetworkReplicationStrategy() returns the replicator node type which uses a mode of "Network", i.e. it does synchronize with replicated chains.

func NewChainInstance

func NewChainInstance[ImplT any](
	chainID string,
	instance ImplT,
) *chainInstance[ImplT]

NewChainInstance creates a new generic instance attached to a ChainID.

func NewConfigOverwrite

func NewConfigOverwrite(
	baseConfig *config.Config,
	chainRegistry ChainRegistry,
	withChainID string,
) *config.Config

NewConfigOverwrite updates a node configuration in-place to overwrite the services listen addresses and uses the chainRegistry instance to retrieve seed nodes configuration and state-sync configuration.

This method uses NewConfigOverwriteWithParameters after having read the parameters from the chainRegistry.

func NewConfigOverwriteWithParameters

func NewConfigOverwriteWithParameters(
	baseConfig *config.Config,
	withChainID string,
	seedNodes string,
	syncConfig *config.StateSyncConfig,
	p2pPortOverwrite int,
	rpcPortOverwrite int,
) (*config.Config, error)

NewConfigOverwriteWithParameters updates a node configuration in-place to overwrite the services listen addresses such that there is one P2P- and one RPC port per replicated chain. Following ports overwrite apply:

- P2P: legacy `26656`, multiplex `30001`...`3000x` with x the index of nodes - RPC: legacy `26657`, multiplex `40001`...`4000x` with x the index of nodes

This method also overwrites the `P2P.Seeds` configuration option such that each replicated chain uses its own seed nodes, and the `WAL` file is changed so that each replicated chain writes to a separate WAL-file.

It returns the newly created *deep-copy* of the node configuration.

func NewLegacyNodeMultiplex

func NewLegacyNodeMultiplex(
	ctx context.Context,
	nodeCfg *config.Config,
	nodeKey *p2p.NodeKey,
	logger cmtlog.Logger,
	options ...node.Option,
) (
	MultiplexMap[*node.Node],
	*Reactor,
	error,
)

NewLegacyNodeMultiplex implements a **fallback to default** implementation of node.Node, such that *multiplex features are disabled* and that the implementation used is `node/node.go`.

Note that the returned Reactor is always nil with this method.

We provide this implementation as a fallback solution and to improve backwards-compatibility with the original `cometbft` source code.

func NewNodesMultiplex

func NewNodesMultiplex(
	ctx context.Context,
	acceptor client.Acceptor,
	globalCfg *config.Config,
	logger cmtlog.Logger,
	options ...node.Option,
) (
	MultiplexMap[*node.Node],
	*Reactor,
	error,
)

NewNodesMultiplex returns a new, ready to go, CometBFT Nodes Multiplex. Multiplex-mode refers to a multi-network replication strategy whereby concurrent consensus instances are enabled for all replicated chains.

Creates one p2p.NodeKey instance per nodes multiplex. This implies that the p2p.ID included in the format `id@host:port` is always the same for one nodes multiplex' listen addresses. i.e. the node ID is shared amongst all replicated chains.

Note also that this method does *not* call the Start() method for the created node instances. It is important to note that each node instance's Start() method must be called in a separate goroutine to permit concurrent consensus instances, blocks production and state machines replication.

CAUTION - EXPERIMENTAL: Running the following code is highly unrecommended in a production environment. Please use these features with caution as it is still being actively developed.

CAUTION: This method expects the genesis file to contain a GenesisDocSet.

func ValidateGenesisDocChecksum

func ValidateGenesisDocChecksum(database *ChainDB, genesisDoc *types.GenesisDoc) error

ValidateGenesisDocChecksum reads a genesis doc hash from the database or inserts it if it's the first time. If it's not the first time, this method will validate that the produced checksum matches the database one.

func WithAcceptor

func WithAcceptor(
	acceptor client.Acceptor,
) func(*Reactor)

WithAcceptor is an option helper to inject a custom acceptor implementation which accepts a user address and an acceptor.

func WithBackend

func WithBackend(a Adapter) func(*MultiplexClient)

WithBackend is an option helper to overwrite the default adapter instance.

func WithChainSeeds

func WithChainSeeds(chainSeeds map[string]string) func(*config.MultiplexConfig)

WithChainSeeds is an option helper that allows you to overwrite the default (empty) ChainSeeds in [MultiplexConfig]. By default, this option is set to an empty map.

func WithNotifier

func WithNotifier(n ClientNotifier) func(*MultiplexClient)

WithNotifier is an option helper to overwrite the default client notifier.

func WithP2PStartPort

func WithP2PStartPort(p2pStartPort uint16) func(*config.MultiplexConfig)

WithP2PStartPort is an option helper that allows you to overwrite the default P2PStartPort in [MultiplexConfig]. By default, this option is set to 30001.

func WithRPCStartPort

func WithRPCStartPort(rpcStartPort uint16) func(*config.MultiplexConfig)

WithRPCStartPort is an option helper that allows you to overwrite the default RPCStartPort in [MultiplexConfig]. By default, this option is set to 40001.

func WithRoutines

func WithRoutines(jobs *BroadcastJobs) func(*MultiplexBackend)

WithRoutines is an option helper to overwrite the BroadcastJobs instance.

func WithStrategy

func WithStrategy(strategy config.ReplicationStrategy) func(*config.MultiplexConfig)

WithStrategy is an option helper that allows you to overwrite the default Strategy in [MultiplexConfig]. By default, this option is set to "Disable".

func WithSyncConfig

func WithSyncConfig(syncConfigs map[string]*config.StateSyncConfig) func(*config.MultiplexConfig)

WithSyncConfig is an option helper that allows you to overwrite the default SyncConfig in [MultiplexConfig]. By default, this option is set to an empty map.

func WithUserChains

func WithUserChains(userChains map[string][]string) func(*config.MultiplexConfig)

WithUserChains is an option helper that allows you to overwrite the default (empty) UserChains in [MultiplexConfig]. By default, this option is set to an empty map.

Types

type Adapter

type Adapter interface {
	client.Server

	// GetLogger should return a [cmtlog.Logger] instance.
	GetLogger() cmtlog.Logger

	// GetRoutines should return an implementation of [BroadcastJobs] methods.
	GetRoutines() *BroadcastJobs

	// GetLocalNetworkHeights should query the last block height and determine
	// a list of required networks. Iff the last block height is 1, the network
	// is considered unknown and may need to be explicitely created.
	GetLocalNetworkHeights(
		userAddress string,
		transactions ...client.Transaction,
	) (map[string]int64, []string)

	// FetchRelayAddresses should find the supported networks which it should
	// map (ChainID) to their respective listen addresses, and it returns a
	// slice of relays that produced errors, e.g. network error.
	FetchRelayAddresses(
		networks []string,
		relays []string,
	) (map[string][]string, []string)

	// DiscoverRelayNetworks should dials all other relays and perform
	// handshakes to retrieve a [MultiNetworkNodeInfo] from each of the relays.
	DiscoverRelayNetworks(
		localSwitch *p2p.Switch,
		relay string,
	) ([]string, []string, error)

	// AddTransactions should execute the CheckTx call to add individual
	// transactions to the mempool by ChainID.
	AddTransactions(
		userAddress string,
		transactions ...client.Transaction,
	) error

	// RemoveTransactions should remove transactions from the local mempool
	// if they were added already, e.g. using AddTransactions.
	RemoveTransactions(
		userAddress string,
		transactions ...client.Transaction,
	) error
}

---------------------------------------------------------------------------- Adapter defines a multiplex backend adapter

Adapter is an interface that defines the rules for the implementation of a backend adapter as required by MultiplexClient. The backend adapter is notably responsible for communicating with relays and transporting data.

This interface also embeds a client.Server interface.

type BroadcastJobs

type BroadcastJobs struct {
	// Routine extensions/overwrites may be provided here.
	NodeRelayDialer NodeRelayDialerFn
	NodeReplRequest NodeReplRequestFn
	NetworksCreator NetworksCreatorFn
	RelaysBroadcast RelaysBroadcastFn
	CancelBroadcast CancelBroadcastFn
}

---------------------------------------------------------------------------- BroadcastJobs defines a multiplex background jobs implementation

BroadcastJobs provides routines implementation for the broadcast process. This structure encapsulates routines implementation for further extension and the adapter instance injects the default implementation if necessary.

type CancelBroadcastFn

type CancelBroadcastFn func(
	context.Context,
	string,
	[]client.Transaction,
)

CancelBroadcastFn describes a function that may be run on a separate goroutine and which should broadcast a rollback message to healthy relays.

The method ignores error from the mempool as transaction are not found.

type ChainDB

type ChainDB struct {
	ChainID string
	dbm.DB
}

ChainDB embeds a dbm.DB instance and adds a ChainID.

type ChainDBContext

type ChainDBContext struct {
	ChainID string
	config.DBContext
}

ChainDBContext embeds a config.DBContext instance and adds a ChainID.

type ChainInstance

type ChainInstance interface {
	// GetChainID should return the ChainID.
	GetChainID() string

	// GetInstance should return the instance.
	GetInstance() any
}

The ChainInstance interface defines the contract for a generic instance mapped by ChainID and the instance may be of any type.

type ChainListenAddr

type ChainListenAddr struct {
	ChainID    string `json:"chain_id"`
	ListenAddr string `json:"listen_addr"`
}

ChainListenAddr contains a ChainID and a listen address.

func NewChainListenAddr

func NewChainListenAddr(chainID string, laddr string) ChainListenAddr

NewChainListenAddr wraps a listen address for a ChainID.

type ChainProtocolVersion

type ChainProtocolVersion struct {
	ChainID string `json:"chain_id"`
	P2P     uint64 `json:"p2p"`
	Block   uint64 `json:"block"`
	App     uint64 `json:"app"`
}

ChainProtocolVersion contains a ChainID and protocol versions for the software.

func NewChainProtocolVersion

func NewChainProtocolVersion(chainID string, ver p2p.ProtocolVersion) ChainProtocolVersion

NewChainProtocolVersion creates a ChainProtocolVersion from a ChainID and a legacy p2p.ProtocolVersion.

type ChainRegistry

type ChainRegistry interface {
	// AddChain should add a new ChainID for userAddress.
	AddChain(userAddress string, chainID string) ChainRegistry

	// HasChain should return true if the ChainID can be found.
	HasChain(chainID string) bool

	// GetChains should return an ordered slice of unique chain identifiers.
	GetChains() []string

	// GetStateSyncConfig should return the required configuration for state-sync.
	// This method should return an error if no sync config can be found.
	GetStateSyncConfig(chainID string) (*config.StateSyncConfig, error)

	// GetSeeds should return a comma-separated list of seed nodes (id@host:port).
	// This method should return an error if no seed nodes can be found.
	GetSeeds(chainID string) (string, error)

	// GetAddress should return the user address attached to a ChainID.
	// This method should return an error if no address can be found.
	GetAddress(chainID string) (string, error)

	// FindChain should search for a ChainID and return its index or -1.
	// This method should return an error if no address can be found.
	FindChain(chainID string) (int, error)
}

----------------------------------------------------------------------------- ChainRegistry

ChainRegistry defines a registry pattern contract which should be searchable by ChainID and by user address.

Note that it is preferable that the ChainID slice returned with [GetChains] is ordered to enable determinism on listing supported replicated chains.

A cacheable chain provider is provided with NewChainRegistry.

func NewChainRegistry

func NewChainRegistry(conf *config.MultiplexConfig, options ...func(ChainRegistry)) (ChainRegistry, error)

NewChainRegistry creates a chain registry using a config.MultiplexConfig configuration. It uses the UserChains field to create an ExtendedChainID per each pair of user address and ChainID.

Note that [GetSyncConfigExtension] and [GetSeedConfigExtension] may be changed in `callbacks.go` to use a different configuration extension.

It is safe to call the client.InjectSyncConfig extension and also the client.InjectChainSeeds extension because the caller is still preparing a ChainRegistry instance, as such, even if the extensions implement blocking processes, they won't impact *runtime* afterwards.

This method implementation supports concurrent calls. NewChainRegistry implements ChainRegistryProvider.

type ChainRegistryProvider

type ChainRegistryProvider func(*config.MultiplexConfig) (ChainRegistry, error)

ChainRegistryProvider defines an interface to provide with an implementation that satisfies the ChainRegistry interface.

type ChecksummedGenesisDocSet

type ChecksummedGenesisDocSet struct {
	GenesisDocs    GenesisDocSet
	Sha256Checksum []byte
}

ChecksummedGenesisDocSet combines a GenesisDocSet together with its SHA256 checksum.

func (*ChecksummedGenesisDocSet) DefaultGenesisDoc

func (c *ChecksummedGenesisDocSet) DefaultGenesisDoc() (*types.GenesisDoc, error)

DefaultGenesisDoc() implements IChecksummedGenesisDoc.

func (*ChecksummedGenesisDocSet) GenesisDocByChainID

func (c *ChecksummedGenesisDocSet) GenesisDocByChainID(
	chainID string,
) (*types.GenesisDoc, error)

GenesisDocByChainID() implements IChecksummedGenesisDoc This method returns an error when the genesis doc cannot be found for the given chainId.

func (*ChecksummedGenesisDocSet) GetChecksum

func (c *ChecksummedGenesisDocSet) GetChecksum() []byte

GetChecksum() implements IChecksummedGenesisDoc.

type ClientNotifier

type ClientNotifier interface {
	SetChannel(ch chan<- client.BroadcastStatus)
	GetChannel() chan<- client.BroadcastStatus

	Error(err error)
	Success(txHashes [][]byte)
}

ClientNotifier defines the contract for client status notifiers as they are used during broadcast operations to asynchronously notify the caller about exact broadcast status updates and errors.

type ExtendedChainID

type ExtendedChainID interface {
	fmt.Stringer

	// GetSeparator should return a fields separator.
	GetSeparator() string

	// GetUserAddress should return the user address in hexadecimal as present
	// in the formatted chain identifier.
	GetUserAddress() string

	// GetFingerprint should return the fingerprint in hexadecimal as present
	// in the formatted chain identifier.
	// The fingerprint is validated for its' size, the content can be arbitrary.
	GetFingerprint() string

	// Format should return a formatted chain identifier that contains the
	// above listed fields: user address and fingerprint.
	Format() string
}

----------------------------------------------------------------------------- ExtendedChainID

ExtendedChainID defines an interface to handle ChainID values that contain special fields, e.g. a user address or an arbitrary fingerprint.

func NewExtendedChainID

func NewExtendedChainID(userAddress, fingerprint string) (ExtendedChainID, error)

NewExtendedChainID creates an extended chain identifier which contains a user address and a fingerprint.

This method expects the userAddress and fingerprint parameters to contain hexadecimal payloads. It will check for correct size match for both the user address (20 bytes) and the arbitrary fingerprint (8 bytes).

func NewExtendedChainIDFromLegacy

func NewExtendedChainIDFromLegacy(chainID string) (ExtendedChainID, error)

NewExtendedChainIDFromLegacy extracts fields of a chain identifier, notably the user address and fingerprints which must both be present.

The prefix is ignored to prevent using different prefixes than `mx-chain`.

type GenesisDocSet

type GenesisDocSet []types.GenesisDoc

GenesisDocSet defines the initial conditions for a CometBFT node multiplex, in particular their validator set and ChainID.

func GenesisDocSetFromFile

func GenesisDocSetFromFile(genDocSetFile string) (GenesisDocSet, error)

GenesisDocSetFromFile reads JSON data from a file and unmarshalls it into a GenesisDocSet.

func GenesisDocSetFromJSON

func GenesisDocSetFromJSON(jsonBlob []byte) (GenesisDocSet, error)

GenesisDocSetFromJSON unmarshalls JSON data into a GenesisDocSet.

func (GenesisDocSet) SaveAs

func (genDocSet GenesisDocSet) SaveAs(file string) error

SaveAs is a utility method for saving GenesisDocSet as a JSON file.

func (GenesisDocSet) SearchGenesisDocByChainID

func (genDocSet GenesisDocSet) SearchGenesisDocByChainID(
	chainID string,
) (doc types.GenesisDoc, found bool, err error)

SearchGenesisDocByChainID starts a search for a GenesisDoc by its ChainID.

func (GenesisDocSet) ValidateAndComplete

func (genDocSet GenesisDocSet) ValidateAndComplete() error

ValidateAndComplete checks that all necessary fields are present.

func (GenesisDocSet) ValidatorHash

func (genDocSet GenesisDocSet) ValidatorHash() []byte

ValidatorHash returns the Merkle root hash built using genesis doc's validator hashes (as leaves).

type MultiNetworkNodeInfo

type MultiNetworkNodeInfo struct {
	// Replication configuration
	Networks         []string               `json:"networks"` // contains ChainIDs
	ProtocolVersions []ChainProtocolVersion `json:"protocol_versions"`
	ListenAddrs      []ChainListenAddr      `json:"listen_addrs"` // accepting incoming
	RPCAddresses     []ChainListenAddr      `json:"rpc_addrs"`    // accepting RPC

	// Authenticate
	// TODO: replace with NetAddress
	DefaultNodeID p2p.ID `json:"id"`          // authenticated identifier
	ListenAddr    string `json:"listen_addr"` // default accepting incoming (first network)

	// Check compatibility.
	// Channels are HexBytes so easier to read as JSON
	Version  string            `json:"version"`  // major.minor.revision
	Channels cmtbytes.HexBytes `json:"channels"` // channels this node knows about

	// ASCIIText fields
	Moniker string                   `json:"moniker"` // arbitrary moniker
	Other   p2p.DefaultNodeInfoOther `json:"other"`   // other application specific data
}

MultiNetworkNodeInfo is a multiplex node information exchanged between two peers during the CometBFT P2P handshake.

func MultiNetworkNodeInfoFromProto

func MultiNetworkNodeInfoFromProto(pb *mxp2p.MultiNetworkNodeInfo) (MultiNetworkNodeInfo, error)

func (MultiNetworkNodeInfo) CompatibleWith

func (info MultiNetworkNodeInfo) CompatibleWith(otherInfo p2p.NodeInfo) error

CompatibleWith checks if two DefaultNodeInfo are compatible with each other.

This implementation of CompatibleWith verifies that at least one of the replicated chains is compatible with the other peer's replicated chains.

CONTRACT: two nodes are compatible if the Block version and network match and they have at least one channel in common.

func (MultiNetworkNodeInfo) GetChannels

func (info MultiNetworkNodeInfo) GetChannels() cmtbytes.HexBytes

GetChannels returns the node's channels.

func (MultiNetworkNodeInfo) GetNodeInfo

func (info MultiNetworkNodeInfo) GetNodeInfo(chainID string) p2p.DefaultNodeInfo

GetNodeInfo returns a p2p.NodeInfo instance by chain ID.

func (MultiNetworkNodeInfo) HasChannel

func (info MultiNetworkNodeInfo) HasChannel(chID byte) bool

func (MultiNetworkNodeInfo) ID

func (info MultiNetworkNodeInfo) ID() p2p.ID

ID returns the node's peer ID.

func (MultiNetworkNodeInfo) NetAddress

func (info MultiNetworkNodeInfo) NetAddress() (*p2p.NetAddress, error)

NetAddress returns a NetAddress derived from the MultiNetworkNodeInfo - it includes the authenticated peer ID and the self-reported ListenAddr. Note that the ListenAddr is not authenticated and may not match that address actually dialed if its an outbound peer.

func (MultiNetworkNodeInfo) ToProto

func (MultiNetworkNodeInfo) Validate

func (info MultiNetworkNodeInfo) Validate() error

Validate checks the self-reported MultiNetworkNodeInfo is safe. It returns an error if there are too many Channels, if there are any duplicate Channels, if the ListenAddr is malformed, or if the ListenAddr is a host name that can not be resolved to some IP.

type MultiplexBackend

type MultiplexBackend struct {
	// contains filtered or unexported fields
}

---------------------------------------------------------------------------- MultiplexBackend defines a multiplex backend adapter implementation

MultiplexBackend implements the Adapter interface for a multiplex client. This implementation makes use of an internal Reactor instance to read local networks heights and uses instances of p2p.Switch to communicate with relays about transactions broadcast operations.

Additionally, an internal [Acceptor] instance may be used to further extend the broadcast process, e.g. to call RollbackTx.

func NewServer

func NewServer(
	impl client.Acceptor,
	nodeConfig *config.Config,
	nodeLogger cmtlog.Logger,
	options ...func(*MultiplexBackend),
) (*MultiplexBackend, error)

NewServer initializes a new MultiplexBackend around an empty multiplex configuration and prepares the node backend by starting the reactor and configuring the necessary node services, i.e. event bus, mempool, etc.

The internal Reactor instance will be started when calling this method, and individual node.Node instances can be retrieved using the reactor's services registry: [Reactor#GetServiceProvider].

See also: NewNodesMultiplex

func (MultiplexBackend) AddTransactions

func (b MultiplexBackend) AddTransactions(
	userAddress string,
	transactions ...client.Transaction,
) error

AddTransactions executes the CheckTx call to add individual transactions to the mempool by ChainID.

This method is called by [BroadcastTx] when the transaction is ready to be broadcast to all other relays. Adding the transaction to the mempool effectively marks the transaction as locally accepted. AddTransactions implements Adapter.

func (MultiplexBackend) Close

func (b MultiplexBackend) Close() error

Close implements io.Closer

func (MultiplexBackend) DefaultCancelBroadcastRoutine

func (b MultiplexBackend) DefaultCancelBroadcastRoutine() CancelBroadcastFn

broadcastRollbacksRoutine broadcasts a rollback message to healthy relays in case any of the relays has already included the transactions in their mempool. The mempool should call [Acceptor#RollbackTx] upon receiving this message.

func (MultiplexBackend) DefaultNetworksCreatorRoutine

func (b MultiplexBackend) DefaultNetworksCreatorRoutine() NetworksCreatorFn

DefaultNetworksCreatorRoutine communicates with relays about missing networks as described by chainIDs. If the relays return an empty response, it means that we must create a new network.

This method creates a new network genesis using [Reactor#MustCreateNetwork], then injects a node runtime using [Reactor#MustInjectNodeRuntime].

func (MultiplexBackend) DefaultNodeRelayDialerRoutine

func (b MultiplexBackend) DefaultNodeRelayDialerRoutine() NodeRelayDialerFn

DefaultNodeRelayDialerRoutine opens new connections to relays if necessary. In case an error happens while trying to connect to relays, an error will be pushed on a pre-allocated notifier.

This method will use the local events switch to open a single connection with each relays. This conn is used to retrieve a MultiNetworkNodeInfo.

func (MultiplexBackend) DefaultNodeReplRequestRoutine

func (b MultiplexBackend) DefaultNodeReplRequestRoutine() NodeReplRequestFn

DefaultNodeReplRequestRoutine asks relays to replicate a network by attaching the corresponding ChainParams.

This method broadcasts a mxp2p.ChainReplicationRequest message to relays, to ask them to replicate a chain using the ChainParams.

func (MultiplexBackend) DefaultRelaysBroadcastRoutine

func (b MultiplexBackend) DefaultRelaysBroadcastRoutine() RelaysBroadcastFn

DefaultRelaysBroadcastRoutine broadcasts all transactions to relays and verifies their respective acceptance of the transaction batch.

If any broadcast to other relays produces an error, the complete transaction batch will be discarded, and a rollback message will be broadcast to other relay's mempool reactors.

func (MultiplexBackend) DiscoverRelayNetworks

func (b MultiplexBackend) DiscoverRelayNetworks(
	localSwitch *p2p.Switch,
	relay string,
) (
	networks []string,
	listenAddrs []string,
	err error,
)

DiscoverRelayNetworks dials the relay using a local p2p.Switch instance to perform a handshake and finally to retrieve a MultiNetworkNodeInfo.

TODO(midas): replace localSwitch.GetPeerConfig() to avoid using local switch. DiscoverRelayNetworks implements Adapter.

func (MultiplexBackend) FetchRelayAddresses

func (b MultiplexBackend) FetchRelayAddresses(
	networks []string,
	relays []string,
) (
	chainRelays map[string][]string,
	errorRelays []string,
)

FetchRelayAddresses uses DiscoverRelayNetworks once for each relay to find the supported networks and their respective listen addresses.

TODO(midas): shouldn't need to use the local p2p.Switch for peerConfig. FetchRelayAddresses implements Adapter.

func (MultiplexBackend) GetAcceptor

func (b MultiplexBackend) GetAcceptor() client.Acceptor

GetAcceptor returns the injected client.Acceptor implementation.

GetAcceptor implements client.Server

func (MultiplexBackend) GetLocalNetworkHeights

func (b MultiplexBackend) GetLocalNetworkHeights(
	userAddress string,
	transactions ...client.Transaction,
) (map[string]int64, []string)

getLocalNetworkHeights finds out about the last block height and determines a list of networks that must be created. The list of networks that must be created will also be present in the list of required networks. GetLocalNetworkHeights implements Adapter.

func (MultiplexBackend) GetLogger

func (b MultiplexBackend) GetLogger() cmtlog.Logger

GetLogger returns the cmtlog.Logger property.

func (MultiplexBackend) GetReactor

func (b MultiplexBackend) GetReactor() *Reactor

GetReactor returns the Reactor instance.

func (MultiplexBackend) GetRoutines

func (b MultiplexBackend) GetRoutines() *BroadcastJobs

GetRoutines returns an injected implementation of BroadcastJobs methods or the default implementations as defined with MultiplexBackend.

This is mainly used to overwrite routines for testing purposes. GetRoutines implements Adapter.

func (MultiplexBackend) MustStart

func (b MultiplexBackend) MustStart()

MustStart starts a replication backend basically selecting void and running forever.

MustStart implements client.Server

func (MultiplexBackend) RemoveTransactions

func (b MultiplexBackend) RemoveTransactions(
	userAddress string,
	transactions ...client.Transaction,
) error

RemoveTransactions remove a transaction from the local mempool if it has been added already, e.g. using addTransactionToMempool.

This method is called by [BroadcastTx] when a transaction rollback must be executed due to some of the healthy relays not accepting a batch. RemoveTransactions implements Adapter.

type MultiplexClient

type MultiplexClient struct {
	// contains filtered or unexported fields
}

MultiplexClient implements a multiplex client by providing an implementation for methods as defined in client.Client.

Note, an client.Acceptor implementation may be provided in NewClient.

func NewClient

func NewClient(
	options ...func(*MultiplexClient),
) *MultiplexClient

NewClient initializes a new MultiplexClient.

func (MultiplexClient) BroadcastTx

func (c MultiplexClient) BroadcastTx(
	ctx context.Context,
	userAddress string,
	relays []string,
	notifier chan<- client.BroadcastStatus,
	transactions ...client.Transaction,
)

BroadcastTx sends an error to a notifier if any of the transactions fails basic verification, or if we fail to get a majority approval for the broadcast operation from healthy relays.

IMPORTANT: It accepts a slice of relays which should be in the format `id@host`. The relays should not contain port numbers as the port number shall be discovered upon confirming that the relay is up and running.

The following steps define a complete broadcast process, in this order:

- Basic verifications and opening relay connections. - New networks must be initialized explicitly. - Wait for networks to be ready before adding to mempool. - Add transactions to mempool, trigger broadcast to relays. - Wait for remote (other relays) transaction acceptance. - Transactions are now broadcast and accepted by all relays,

i.e. The transaction broadcast happens only when consensus succeeded.

func (MultiplexClient) BroadcastTxRemoval

func (c MultiplexClient) BroadcastTxRemoval(
	ctx context.Context,
	userAddress string,
	relays []string,
	notifier chan<- client.BroadcastStatus,
	transactions ...client.Transaction,
)

BroadcastTxRemoval sends an error to a notifier if any of the removal operations fail verification, or if we fail to get a majority approval for the broadcast operation from healthy relays.

func (MultiplexClient) GetBackend

func (c MultiplexClient) GetBackend() Adapter

GetBackend returns the backend Adapter implementation.

func (MultiplexClient) SetBackend

func (c MultiplexClient) SetBackend(a Adapter)

SetBackend overwrite the backend Adapter instance.

type MultiplexDB

type MultiplexDB map[string]*ChainDB

MultiplexDB maps ChainIDs to database instances.

func NewMultiplexDB

func NewMultiplexDB(ctx *ChainDBContext) (multiplex MultiplexDB, err error)

NewMultiplexDB returns multiple databases using the DBBackend and DBDir specified in the Config and uses two levels of subfolders for user and chain.

Note that a separate folder is created for every replicated chain and that it is organized under a user address parent folder in the `data/` folder.

type MultiplexFS

type MultiplexFS map[string]string

MultiplexFS maps ChainIDs to filesystem paths (data/...)

func NewMultiplexFS

func NewMultiplexFS(conf *config.Config) (multiplex MultiplexFS, err error)

NewMultiplexFS returns multiple data filesystem paths using DefaultDataDir specified in the Config and uses two levels of subfolders for user and chain.

Note that a separate folder is created for every replicated chain and that it is organized under a user address parent folder in the `data/` folder.

type MultiplexMap

type MultiplexMap[ImplT any] map[string]*chainInstance[ImplT]

MultiplexMap is a generic map for which keys are ChainID values and values are of the type passed with ImplT - the underlying ChainInstance.

func DefaultNewNodesMultiplex

func DefaultNewNodesMultiplex(
	globalCfg *config.Config,
	logger cmtlog.Logger,
	options ...node.Option,
) (MultiplexMap[*node.Node], error)

DefaultNewNodesMultiplex returns a CometBFT Nodes Multiplex with default settings for the PrivValidator, ClientCreator, GenesisDoc, and DBProvider.

This method is used in `cmd/cometbft/main.go` to create a nodes multiplex.

See also: NewNodesMultiplex This method implements NodesMultiplexProvider.

type NamedMultiplexMap

type NamedMultiplexMap[ImplT any] map[string]MultiplexMap[ImplT]

NamedMultiplexMap is a wrapper around MultiplexMap that maps multiple instances of it by name. The first-level of keys are "multiplex names" and the second-level are from the MultiplexMap, i.e. ChainID.

NamedMultiplexMap[*ChainDB]{ "database": MultiplexMap[*ChainDB]{
  "mx-chain-...-...": &ChainDB{ ChainID: "...", DB: ... }
}}

type NetworksCreatorFn

type NetworksCreatorFn func(
	context.Context,
	map[string][]string,
	[]string,
	ClientNotifier,
	chan<- string,
)

NetworksCreatorFn describes a function that may be run on a separate goroutine and which should communicate with relays about missing networks.

A StatusNotifier instance contains a channel used to transmit errors. Also a string channel instance is accepted as newChainReadyCh where ChainIDs are pushed when a new network is ready (or is now known through relay).

type NodeRelayDialerFn

type NodeRelayDialerFn func(
	context.Context,
	map[string][]string,
	ClientNotifier,
)

NodeRelayDialerFn describes a function that may be run on a separate goroutine and which should open connections to relays if necessary.

A StatusNotifier instance contains a channel used to transmit errors.

type NodeReplRequestFn

type NodeReplRequestFn func(
	context.Context,
	[]string,
	string,
	ClientNotifier,
)

NodeReplRequestFn describes a function that may be run on a separate goroutine and which should open connections to relays if necessary.

A StatusNotifier instance contains a channel used to transmit errors.

type NodesMultiplexProvider

type NodesMultiplexProvider func(
	*config.Config,
	cmtlog.Logger,
	...node.Option,
) (MultiplexMap[*node.Node], error)

NodesMultiplexProvider takes a config and a logger and returns a ready-to-go nodes multiplex, i.e. [mx.MultiplexMap[*node.Node]].

Note that providers *must not* start node instance.

type Reactor

type Reactor struct {
	p2p.BaseReactor // BaseService + p2p.Switch
	// contains filtered or unexported fields
}

The Reactor implementation takes care of configuring node instances for the correct replicated blockchain networks. The reactor starts multiple listeners in parallel and sends messages on a channel to report about successful launch.

When a set of node listeners is ready, the multiplex reactor sends a message on its channel `chainReadyCh` which contains a ChainID of the chain that is being replicated. After this happened, the node is able to start syncing state and/or blocks, as well as starting indexers, mempool, and other services.

The Reactor structure implements snapsapp.Reactor.

func NewReactor

func NewReactor(
	nodeKey *p2p.NodeKey,
	nodeCfg *config.Config,
	logger cmtlog.Logger,
	chainRegistry ChainRegistry,
	genesisDocsProvider node.GenesisDocProvider,
	options ...func(*Reactor),
) *Reactor

NewReactor creates a new multiplex reactor around a p2p.NodeKey, a global node configuration with config.Config and a ChainRegistry.

Note that the genesisDocsProvider must be passed as well but is being used only at Start of the reactor, when the initial sm.State is loaded.

func (*Reactor) AddPeer

func (r *Reactor) AddPeer(peer p2p.Peer)

AddPeer implements p2p.Reactor.

func (*Reactor) AllocateNetwork

func (reactor *Reactor) AllocateNetwork(
	chainID string,
) error

AllocateNetwork allocates the necessary resources including the filesystem structure, the databases and a priv validator.

func (*Reactor) CreateAddressBooks

func (reactor *Reactor) CreateAddressBooks(
	ctx context.Context,
	networks []string,
) error

CreateAddressBooks validates the existence of a per-network filesystem path for configuration files, i.e. %rootDir%/config/%address%/%chain%/.

Then it configures a pex.AddrBook instance which is used to create the pex.Reactor, and then added to the event switch. The result is that one `addrbook.json` file exists per each replicated chain.

Note that this method must be called after [CreateTransportSwitches].

func (*Reactor) CreateConsensusInstanceReactors

func (reactor *Reactor) CreateConsensusInstanceReactors(
	ctx context.Context,
	chainID string,
	blockSync bool,
) error

CreateConsensusInstanceReactors creates all reactors necessary to setup a node for being consensus-ready with a network.

This method creates instances for the following services and reactors:

1) Create the mempool / mempool reactor 2) Create the evidence pool / evidence reactor 3) Create the block executor 4) Create block-sync reactor 5) Create consensus state / reactor

Afterwards, pointers to the created instances are registered on the reactor.

This method registers instances in the multiplexRegistry: - `flag/blockSync`: A flag that determines whether block-sync must run.

This method also registers services in the servicesRegistry: - `reactor/mempool`: The mempool reactor with Mempool ABCI conn. - `reactor/blockSync`: The block-sync reactor with a block executor. - `reactor/consensus`: The consensus reactor with WAL file overwrite. - `reactor/evidence`: The evidence reactor around state- and block stores.

func (*Reactor) CreateTransportSwitch

func (reactor *Reactor) CreateTransportSwitch(
	chainID string,
	withNodeInfo MultiNetworkNodeInfo,
) error

CreateTransportSwitch creates a singular p2p.Switch attached to a per-network p2p.MultiplexTransport instance and attaches a relay's MultiNetworkNodeInfo to communicate with it.

func (*Reactor) CreateTransportSwitchesWithReactors

func (reactor *Reactor) CreateTransportSwitchesWithReactors(
	ctx context.Context,
	networks []string,
) error

CreateTransportSwitches initializes P2P transports using the legacy structure p2p.MultiplexTransport, but injects a *custom TLS handshake* implementation with MultiplexTransportHandshake.

Then, an event switch is initialized with p2p.Switch with the transport multiplex and an address book. At last, a pex.Reactor is also created.

Note that this method must be called after [CreateConsensusInstanceReactors].

This method also registers instances in the multiplexRegistry: - `p2p/transport`: the p2p.MultiplexTransport instance per chain. - `p2p/switch`: the p2p.Switch instance with all consensus reactors.

TODO(midas): TBI impact of ABCI query that uses /p2p/filter, discarded here. TODO(midas): we must probably divide the max peers by the number of known networks.

func (*Reactor) GetChainRegistry

func (reactor *Reactor) GetChainRegistry() ChainRegistry

GetChainRegistry returns a ChainRegistry instance.

func (*Reactor) GetChannels

func (*Reactor) GetChannels() []*p2p.ChannelDescriptor

GetChannels implements p2p.Reactor.

func (*Reactor) GetChecksummedGenesisDocSet

func (reactor *Reactor) GetChecksummedGenesisDocSet() *ChecksummedGenesisDocSet

GetChecksummedGenesisDocSet returns a ChecksummedGenesisDocSet instance.

func (*Reactor) GetConfigsPaths

func (reactor *Reactor) GetConfigsPaths() map[string]string

GetConfigsPaths returns a MultiplexFS instance.

func (*Reactor) GetGenesisProvider

func (reactor *Reactor) GetGenesisProvider() genesisDocProviderFn

GetGenesisProvider returns a genesisDocProviderFn instance.

func (*Reactor) GetInstanceProvider

func (reactor *Reactor) GetInstanceProvider(multiplexName string) instanceProviderFn

GetInstanceProvider returns a [InstanceProvider] providereactor.

func (*Reactor) GetLogger

func (reactor *Reactor) GetLogger() cmtlog.Logger

GetLogger returns a cmtlog.Logger instance.

func (*Reactor) GetMultiNetworkNodeInfo

func (reactor *Reactor) GetMultiNetworkNodeInfo() *MultiNetworkNodeInfo

GetMultiNetworkNodeInfo returns a MultiNetworkNodeInfo pointer.

func (*Reactor) GetMultiplexConfig

func (reactor *Reactor) GetMultiplexConfig() *config.MultiplexConfig

GetMultiplexConfig returns a config.MultiplexConfig instance.

func (*Reactor) GetMultiplexProvider

func (reactor *Reactor) GetMultiplexProvider() multiplexProviderFn

GetMultiplexProvider returns a [MultiplexProvider] providereactor.

func (*Reactor) GetNetworks

func (reactor *Reactor) GetNetworks() []string

GetNetworks returns an ordered slice of ChainID values.

GetNetworks implements snapsapp.Reactor.

func (*Reactor) GetNodeConfig

func (reactor *Reactor) GetNodeConfig() *config.Config

GetNodeConfig returns a config.Config instance.

func (*Reactor) GetNodeKey

func (reactor *Reactor) GetNodeKey() *p2p.NodeKey

GetNodeKey returns the p2p.NodeKey instance.

func (*Reactor) GetServicesProvider

func (reactor *Reactor) GetServicesProvider() serviceProviderFn

GetServicesProvider returns a [ServiceProvider] providereactor.

func (*Reactor) GetStateStore

func (reactor *Reactor) GetStateStore(chainID string) sm.Store

GetStateStore returns a sm.Store.

GetStateStore implements snapsapp.Reactor.

func (*Reactor) GetStoragePaths

func (reactor *Reactor) GetStoragePaths() map[string]string

GetStoragePaths returns a MultiplexFS instance.

GetStoragePaths implements snapsapp.Reactor.

func (*Reactor) HasNetwork

func (reactor *Reactor) HasNetwork(chainID string) bool

HasNetwork returns true if the ChainID can be found

HasNetwork implements snapsapp.Reactor.

func (*Reactor) InitMultiplexBlockStores

func (reactor *Reactor) InitMultiplexBlockStores() error

InitMultiplexBlockStores loads a bs.BlockStore multiplex using the reactor's instance provider to retrieve a blockstore database instance by ChainID.

First, a blockstore database instance is retrieved, then a non-snapshottable bs.BlockStore instance is created around the blockstore database instance for the respective replicated chain.

This method also registers instances in the multiplexRegistry: - `blockStore`: the bs.BlockStore instance attached to the database.

func (*Reactor) InitMultiplexStates

func (reactor *Reactor) InitMultiplexStates() error

InitMultiplexStates loads a state multiplex using the reactor's instance provider to retrieve a state database instance by ChainID.

First, a state database instance is retrieved, then the GenesisDoc checksum is validated against the hash stored in the state database. Next, a sm.Store instance is created around the state database instance for the respective replicated chain.

And finally, this method will *load the state machine* using either the database or the GenesisDoc.

This method also registers instances in the multiplexRegistry: - `state`: the sm.State state machine instances. - `stateStore`: the sm.Store instance attached to the database.

func (*Reactor) InjectGenesisDoc

func (reactor *Reactor) InjectGenesisDoc(
	chainID string,
	confDir string,
	genesisDoc types.GenesisDoc,
) (*ChecksummedGenesisDocSet, error)

InjectGenesisDoc injects a genesisDoc in the reactor's GenesisDocSet and creates a individual genesis doc in the replicated chain config folder.

Note that the [GenesisDocSet#Sha256Checksum] is also updated here.

func (*Reactor) InjectNewNetwork

func (reactor *Reactor) InjectNewNetwork(
	chainID string,
) error

InjectNewNetwork bootstraps a new network using chainID and returns an error if any of the required steps do not succeed.

Following services and resources are configured, in this order:

- Creates the necessary filesystem structure. - Creates the databases connections for state and blocks. - Creates a priv validator that will be included in GenesisDoc. - Creates a new genesis document with one validator. - Initializes a new state machine around the genesis doc. - Initializes a configuration object for a new node. - Updates the ABCI client and MultiNetworkNodeInfo.

See also: [InjectNewRuntime].

func (*Reactor) InjectNewRuntime

func (reactor *Reactor) InjectNewRuntime(
	ctx context.Context,
	chainID string,
	options ...node.Option,
) error

InjectNewRuntime begins by starting some mandatory node services such as the event bus, the priv validator and indexers. It will then issue a handshake using the ABCI client, and also initialize the necessary consensus reactors including: mempool, evidence, block executor and block-sync and finally, it shall create a Switch and AddrBook for the new network node.

After having initialized all the required reactors, a node.Node instance is created and started in a parallel goroutine. Given a nil return value means that the node instance is running and that `Stop()` may be called.

This method executes parallel goroutines which may produce panics. It is recommended to recover from panics in the caller thread.

Caution: The method [InjectNewNetwork] must be called before.

func (*Reactor) InjectStateMachine

func (reactor *Reactor) InjectStateMachine(
	chainID string,
	icsGenesisDocSet *ChecksummedGenesisDocSet,
) error

InjectStateMachine injects a new state machine created from a GenesisDoc in the icsGenesisDocSet for a single chainID.

This method is responsible for initializing a sm.State from genesisDoc and a block store bs.Store using the previously created databases, i.e. AllocateNetwork must have been called before.

func (*Reactor) MakeNetworkConfigOverwrite

func (reactor *Reactor) MakeNetworkConfigOverwrite(
	chainID ExtendedChainID,
) (*config.Config, error)

MakeNetworkConfigOverwrite creates the config.Config instance for a new network chainID and overwrites the services listen addresses such that a node may be run using the current runtime.

func (*Reactor) MakeNetworkDatabases

func (reactor *Reactor) MakeNetworkDatabases(
	chainID ExtendedChainID,
	databases []string,
) (dbs map[string]dbm.DB, err error)

MakeNetworkDatabases creates and opens database instances for a network chainID and for one or many database names.

func (*Reactor) MakeNetworkFilesystem

func (reactor *Reactor) MakeNetworkFilesystem(
	chainID ExtendedChainID,
) (string, string, error)

MakeNetworkFilesystem creates the filesystem structure for a network chainID and returns the configuration and data paths.

Return order is: config folder, data folder, error.

func (*Reactor) MakeNetworkGenesis

func (reactor *Reactor) MakeNetworkGenesis(
	chainID ExtendedChainID,
	confDir string,
	privValidator types.PrivValidator,
) (*ChecksummedGenesisDocSet, error)

MakeNetworkGenesis creates the types.GenesisDoc for a new network chainID which contains a privValidator public key.

TODO(midas): validators voting power = 10, maybe needs change?

func (*Reactor) MakeNetworkStateMachine

func (reactor *Reactor) MakeNetworkStateMachine(
	genesisDoc *types.GenesisDoc,
	stateDB dbm.DB,
	blockstoreDB dbm.DB,
) (sm.State, sm.Store, *bs.BlockStore, error)

MakeNetworkStateMachine creates instances of sm.State, sm.Store and a bs.BlockStore for a new network chainID around its' genesisDoc and the previously created stateDB and blockstoreDB.

func (*Reactor) MakeNetworkValidator

func (reactor *Reactor) MakeNetworkValidator(
	confDir string,
	dataDir string,
) (types.PrivValidator, error)

MakeNetworkValidator creates the priv validator for a new network chainID which will also be added to the new GenesisDoc.

func (*Reactor) OnStart

func (reactor *Reactor) OnStart() error

OnStart starts the multiplex reactor and must initialize the filesystem and database instances, as well as the block and state stores such that after being started, the reactor can be used to configure the running node services.

A custom deep-copied *config.Config is prepare for each replicated chain, and when all configuration is ready for a particular network, this method writes a message with the ChainID on its channel `chainReadyCh`.

This method registers instances in the multiplexRegistry: - `config`: the configuration overwrite for each network. - `storage`: the filesystem paths for each network. - `state`: the sm.State state machine instances (InitMultiplexStates). - `stateStore`: the sm.Store instance attached (InitMultiplexStates). - `database/blockstore`: the blockstore databases (initMultiplexDatabases). - `database/state`: the state machine databases (initMultiplexDatabases). - `database/tx_index`: the tx_index databases (initMultiplexDatabases). - `database/evidence`: the evidence databases (initMultiplexDatabases). - `privValidator`: the PrivValidator instance (startNodeListeners).

This method also registers services in the servicesRegistry: - `eventBus`: the event bus for block events (startNodeListeners). - `indexers`: the transaction- and block indexers service (startNodeListeners).

CAUTION: This method spawns one new goroutine for every replicated chain.

func (*Reactor) OnStop

func (reactor *Reactor) OnStop()

OnStop stops the multiplex reactor and all the registered services that are running, including the ABCI client if it is running.

A *reversed* services sequence is used to implement the LIFO strategy when shutting down services as it is usual for services that are started last to be using previously created instances of other services.

As for the database multiplexes, they are used in an unordered format and no specific order is used to close the database connection because every database connection is independent of other database connections.

func (*Reactor) PrepareConsensusInstanceWithReactor

func (reactor *Reactor) PrepareConsensusInstanceWithReactor(
	ctx context.Context,
	chainID string,
) error

PrepareConsensusInstanceWithReactor initializes a consensus handshake.

CAUTION: the consensus handshake (ABCI <> DB) must only be done when state sync does not execute. This handshake is intended to synchronize the database by executing blocks replay if necessary.

After the handshake, this method will *re-load the state machine*.

func (*Reactor) Receive

func (r *Reactor) Receive(e p2p.Envelope)

Receive implements p2p.Reactor.

func (*Reactor) RegisterInstance

func (reactor *Reactor) RegisterInstance(
	multiplexName string,
	chainID string,
	instance any,
)

RegisterInstance inserts a generic instance in the multiplexRegistry, by a given multiplexName and ChainID.

The multiplexMutex is RW-locked during the time this function takes to run.

func (*Reactor) RegisterNetwork

func (reactor *Reactor) RegisterNetwork(
	userAddress string,
	chainID string,
) error

RegisterNetwork updates the necessary resources to permit executing a new network node runtime for userAddress and chainID.

This method notably mutates the chainRegistry, the networks list, the ABCI client and the MultiNetworkNodeInfo instance of the reactor.

Subsequent dialing of this relay will include the new network.

func (*Reactor) RegisterService

func (reactor *Reactor) RegisterService(
	serviceName string,
	chainID string,
	service cmtlibs.Service,
)

RegisterService inserts a cmtlibs.Service instance in the registry by a given name and ChainID.

The servicesMutex is RW-locked during the time this function takes to run.

func (*Reactor) RemovePeer

func (r *Reactor) RemovePeer(peer p2p.Peer, _ any)

RemovePeer implements p2p.Reactor.

func (*Reactor) SetABCIClient

func (reactor *Reactor) SetABCIClient(abciClient proxy.ChainConns)

SetABCIClient sets a custom proxy.ChainConns ABCI client. Note that this method is only used in tests for now.

func (*Reactor) SetConfigsPaths

func (reactor *Reactor) SetConfigsPaths(fs MultiplexFS)

SetConfigsPaths sets a custom MultiplexFS map of configs paths.

func (*Reactor) SetLogger

func (reactor *Reactor) SetLogger(logger cmtlog.Logger)

SetLogger sets a custom cmtlog.Logger instance.

func (*Reactor) SetNodeInfo

func (reactor *Reactor) SetNodeInfo(nodeInfo *MultiNetworkNodeInfo)

SetNodeInfo sets a custom MultiNetworkNodeInfo instance. Note that this method is only used in tests for now.

func (*Reactor) SetServicesProvider

func (reactor *Reactor) SetServicesProvider(provider serviceProviderFn)

SetServicesProvider sets a custom services providereactor. Note that this method is only used in tests for now.

func (*Reactor) SetStoragePaths

func (reactor *Reactor) SetStoragePaths(fs MultiplexFS)

SetStoragePaths sets a custom MultiplexFS map of storage paths.

func (*Reactor) WaitForNetworks

func (reactor *Reactor) WaitForNetworks() error

WaitForNetworks waits for *all* configured networks to be readily configured. This method expects updates on the chainReadyCh private channel for each of the configured replicated chains. A sync.WaitGroup is used.

TODO(midas): add timeout functionality in case some networks are stuck?

type RelaysBroadcastFn

type RelaysBroadcastFn func(
	context.Context,
	map[string][]string,
	string,
	[]client.Transaction,
	ClientNotifier,
	chan<- string,
)

RelaysBroadcastFn describes a function that may be run on a separate goroutine and which should broadcast all transactions to relays.

A StatusNotifier instance contains a channel used to transmit errors. Also a string channel instance is accepted as relayAcceptTxCh where transaction hashes are pushed when a transaction has been accepted by at least 50%+1 of the healthy (currently active) relays.

type StatusNotifier

type StatusNotifier struct {
	// contains filtered or unexported fields
}

---------------------------------------------------------------------------- StatusNotifier defines a broadcast status notifier

StatusNotifier implements the ClientNotifier interface.

func (*StatusNotifier) Error

func (n *StatusNotifier) Error(err error)

Error pushes a client.BroadcastStatus on the notifier and attaches the error.

func (*StatusNotifier) GetChannel

func (n *StatusNotifier) GetChannel() chan<- client.BroadcastStatus

GetChannel returns a receive-only channel for this notifier.

func (*StatusNotifier) SetChannel

func (n *StatusNotifier) SetChannel(ch chan<- client.BroadcastStatus)

SetChannel registers a receive-only channel for this notifier.

func (*StatusNotifier) Success

func (n *StatusNotifier) Success(txHashes [][]byte)

Success pushes a client.BroadcastStatus on the notifier and attaches a nil-error and accepted transaction hashes.

Directories

Path Synopsis
This package provides a client contract for the multiplex library.
This package provides a client contract for the multiplex library.
The `snapsapp` package implements a multi-network ABCI application that enables consensus events mapping for the client implementation.
The `snapsapp` package implements a multi-network ABCI application that enables consensus events mapping for the client implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL