exchange

package
v1.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2023 License: MIT Imports: 33 Imported by: 0

Documentation

Overview

Example (PoolExchangeDagBetweenPoolNodes)

Example_poolExchangeDagBetweenPoolNodes starts up a pool with 2 nodes, stores a sample DAG in one node and fetches it via GraphSync from the other node.

server := startMockServer("127.0.0.1:4001")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
rng := rand.New(rand.NewSource(42))

// Instantiate the first node in the pool
pid1, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h1, err := libp2p.New(libp2p.Identity(pid1))
if err != nil {
	panic(err)
}
n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1))
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
pid2, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h2, err := libp2p.New(libp2p.Identity(pid2))
if err != nil {
	panic(err)
}
n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2))
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Connect n1 to n2.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}

// Authorize exchange between the two nodes
if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil {
	panic(err)
}
if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil {
	panic(err)
}

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil {
	panic(err)
}

// Since push is an asynchronous operation, wait until background push is finished
// by periodically checking if link is present on node 1.
for {
	if exists, _ := n1.Has(ctx, n2RootLink); exists {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

// Assert that n1 now has both root and leaf links
if exists, err := n1.Has(ctx, n2RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n1.Has(ctx, n2leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
Output:

Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
exchanging by Pull...
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42}
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"this":true}
exchanging by Push...
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"anotherLeafLink":{"/":"bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4"},"this":24}
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"that":false}
Example (ProvideAfterPull)
server := startMockServer("127.0.0.1:4001")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
rng := rand.New(rand.NewSource(42))

// Instantiate the first node in the pool
pid1, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h1, err := libp2p.New(libp2p.Identity(pid1))
if err != nil {
	panic(err)
}
n1, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h1),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
pid2, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h2, err := libp2p.New(libp2p.Identity(pid2))
if err != nil {
	panic(err)
}
n2, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h2),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
pid3, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h3, err := libp2p.New(libp2p.Identity(pid3))
if err != nil {
	panic(err)
}
n3, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h3),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

// Instantiate the fourth node not in the pool
pid4, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h4, err := libp2p.New(libp2p.Identity(pid4))
if err != nil {
	panic(err)
}
n4, err := blox.New(
	blox.WithPoolName("0"),
	blox.WithTopicName("0"),
	blox.WithHost(h4),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n4.Start(ctx); err != nil {
	panic(err)
}
defer n4.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", "0", h4.ID().String())

// Connect n1 to n2 and n3 so that there is a path for gossip propagation.
// Note that we are not connecting n2 to n3 as they should discover
// each other via pool's iexist announcements.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil {
	panic(err)
}

// Wait until the nodes discover each other
for {
	if len(h1.Peerstore().Peers()) >= 3 &&
		len(h2.Peerstore().Peers()) >= 3 &&
		len(h3.Peerstore().Peers()) >= 3 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h1.ID())

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h2.ID())

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h3.ID())

//Manually adding h4 as it is not in the same pool
h1.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
//Manually adding h4 as it is not in the same pool
h2.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
//Manually adding h4 as it is not in the same pool
h3.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}

// Wait until the fourth node discover others
for {
	if len(h4.Peerstore().Peers()) >= 4 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

//Store a link in h1 and find providers from h2

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n2RootLink, n2leafLink)

//n1.UpdateDhtPeers(h2.Peerstore().Peers())
//n2.UpdateDhtPeers(h1.Peerstore().Peers())

err = n1.ProvideLinkByDht(n1RootLink)
if err != nil {
	fmt.Print("Error happened in ProvideLinkByDht")
	panic(err)
}
peerlist1, err := n2.FindLinkProvidersByDht(n1RootLink)
if err != nil {
	fmt.Print("Error happened in FindLinkProvidersByDht")
	panic(err)
}
// Iterate over the slice and print the peer ID of each AddrInfo
for _, addrInfo := range peerlist1 {
	fmt.Printf("Found %s on %s\n", n1RootLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string
}

err = n1.PingDht(h3.ID())
if err != nil {
	fmt.Print("Error happened in PingDht")
	panic(err)
}

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2leafLink); err != nil {
	panic(err)
}

peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink)
if err != nil {
	fmt.Print("Error happened in FindLinkProvidersByDht3")
	panic(err)
}

// Iterate over the slice and print the peer ID of each AddrInfo
for _, addrInfo := range peerlist3 {
	fmt.Printf("Found %s on %s", n2leafLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string
}
Output:

Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
Instantiated node in pool 0 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe
Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains >=3 nodes:
Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains >=3 nodes:
Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains >=3 nodes:
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links:
    root: bafyreiekrzm6lpylw7hhrgvmzqfsek7og6ucqgpzns3ysbc5wj3imfrsge
    leaf:bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4
Found bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy on QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
exchanging by Pull...
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42}
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"this":true}
exchanging by Push...
Found bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4 on QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT

Index

Examples

Constants

View Source
const (
	FxExchangeProtocolID = "/fx.land/exchange/0.0.1"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConfigUpdater added in v1.14.0

type ConfigUpdater func([]peer.ID) error

type Exchange

type Exchange interface {
	Start(context.Context) error
	Push(context.Context, peer.ID, ipld.Link) error
	Pull(context.Context, peer.ID, ipld.Link) error
	SetAuth(context.Context, peer.ID, peer.ID, bool) error
	Shutdown(context.Context) error
	IpniNotifyLink(link ipld.Link)
}

type FxExchange added in v0.5.4

type FxExchange struct {
	// contains filtered or unexported fields
}

func NewFxExchange added in v0.5.4

func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, error)

func (*FxExchange) FindProvidersDht added in v1.16.0

func (e *FxExchange) FindProvidersDht(l ipld.Link) ([]peer.AddrInfo, error)

func (*FxExchange) GetAuth added in v1.0.0

func (e *FxExchange) GetAuth(ctx context.Context) (peer.ID, error)

func (*FxExchange) GetAuthorizedPeers added in v1.14.0

func (e *FxExchange) GetAuthorizedPeers(ctx context.Context) ([]peer.ID, error)
func (e *FxExchange) IpniNotifyLink(link ipld.Link)

func (*FxExchange) PingDht added in v1.16.0

func (e *FxExchange) PingDht(p peer.ID) error

func (*FxExchange) ProvideDht added in v1.16.0

func (e *FxExchange) ProvideDht(l ipld.Link) error

func (*FxExchange) Pull added in v0.5.4

func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error

func (*FxExchange) Push added in v0.5.4

func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error

func (*FxExchange) PutValueDht added in v1.16.0

func (e *FxExchange) PutValueDht(ctx context.Context, key string, val string) error

func (*FxExchange) SearchValueDht added in v1.16.0

func (e *FxExchange) SearchValueDht(ctx context.Context, key string) (string, error)

func (*FxExchange) SetAuth added in v0.8.3

func (e *FxExchange) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow bool) error

func (*FxExchange) Shutdown added in v0.5.4

func (e *FxExchange) Shutdown(ctx context.Context) error

func (*FxExchange) Start added in v0.5.4

func (e *FxExchange) Start(ctx context.Context) error

func (*FxExchange) UpdateDhtPeers added in v1.16.0

func (e *FxExchange) UpdateDhtPeers(peers []peer.ID) error

type NoopExchange added in v0.5.4

type NoopExchange struct{}
func (n NoopExchange) IpniNotifyLink(l ipld.Link)

func (NoopExchange) Pull added in v0.5.4

func (n NoopExchange) Pull(_ context.Context, from peer.ID, l ipld.Link) error

func (NoopExchange) Push added in v0.5.4

func (n NoopExchange) Push(_ context.Context, to peer.ID, l ipld.Link) error

func (NoopExchange) SetAuth added in v0.8.3

func (n NoopExchange) SetAuth(_ context.Context, on peer.ID, subject peer.ID, allow bool) error

func (NoopExchange) Shutdown added in v0.5.4

func (n NoopExchange) Shutdown(context.Context) error

func (NoopExchange) Start added in v0.5.4

func (n NoopExchange) Start(context.Context) error

type Option added in v0.8.3

type Option func(*options) error

func WithAllowTransientConnection added in v0.8.4

func WithAllowTransientConnection(t bool) Option

func WithAuthorizedPeers added in v1.14.0

func WithAuthorizedPeers(l []peer.ID) Option

func WithAuthorizer added in v0.8.3

func WithAuthorizer(a peer.ID) Option

WithAuthorizer sets the peer ID that has permission to configure DAG exchange authorization. Defaults to authorization disabled.

func WithDhtProviderOptions added in v1.16.0

func WithDhtProviderOptions(d ...dht.Option) Option

func WithIpniProviderEngineOptions added in v1.0.0

func WithIpniProviderEngineOptions(e ...engine.Option) Option

func WithIpniPublishChanBuffer added in v1.0.0

func WithIpniPublishChanBuffer(s int) Option

func WithIpniPublishDisabled added in v1.0.0

func WithIpniPublishDisabled(d bool) Option

func WithIpniPublishInterval added in v1.0.0

func WithIpniPublishInterval(t time.Duration) Option

func WithIpniPublishMaxBatchSize added in v1.0.0

func WithIpniPublishMaxBatchSize(s int) Option

func WithUpdateConfig added in v1.14.0

func WithUpdateConfig(updateConfig ConfigUpdater) Option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL