exchange

package
v1.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2023 License: MIT Imports: 34 Imported by: 0

Documentation

Overview

Example (PoolExchangeDagBetweenPoolNodes)

Example_poolExchangeDagBetweenPoolNodes starts up a pool with 2 nodes, stores a sample DAG in one node and fetches it via GraphSync from the other node.

server := startMockServer("127.0.0.1:4001")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.

// Instantiate the first node in the pool
h1, err := libp2p.New(libp2p.Identity(generateIdentity(1)))
if err != nil {
	panic(err)
}
n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1))
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
h2, err := libp2p.New(libp2p.Identity(generateIdentity(2)))
if err != nil {
	panic(err)
}
n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2))
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Connect n1 to n2.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}

// Authorize exchange between the two nodes
if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil {
	panic(err)
}
if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil {
	panic(err)
}

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil {
	panic(err)
}

// Since push is an asynchronous operation, wait until background push is finished
// by periodically checking if link is present on node 1.
for {
	if exists, _ := n1.Has(ctx, n2RootLink); exists {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

// Assert that n1 now has both root and leaf links
if exists, err := n1.Has(ctx, n2RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n1.Has(ctx, n2leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
Output:

Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
exchanging by Pull...
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42}
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"this":true}
exchanging by Push...
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully pushed:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"anotherLeafLink":{"/":"bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4"},"this":24}
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully pushed:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"that":false}
Example (ProvideAfterPull)
server := startMockServer("127.0.0.1:4001")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.

// Instantiate the first node in the pool
h1, err := libp2p.New(libp2p.Identity(generateIdentity(1)))
if err != nil {
	panic(err)
}
n1, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h1),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
h2, err := libp2p.New(libp2p.Identity(generateIdentity(2)))
if err != nil {
	panic(err)
}
n2, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h2),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
h3, err := libp2p.New(libp2p.Identity(generateIdentity(3)))
if err != nil {
	panic(err)
}
n3, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h3),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

// Instantiate the fourth node not in the pool
h4, err := libp2p.New(libp2p.Identity(generateIdentity(4)))
if err != nil {
	panic(err)
}
n4, err := blox.New(
	blox.WithPoolName("0"),
	blox.WithTopicName("0"),
	blox.WithHost(h4),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n4.Start(ctx); err != nil {
	panic(err)
}
defer n4.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", "0", h4.ID().String())

// Connect n1 to n2 and n3 so that there is a path for gossip propagation.
// Note that we are not connecting n2 to n3 as they should discover
// each other via pool's iexist announcements.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil {
	panic(err)
}

// Wait until the nodes discover each other
for {
	if len(h1.Peerstore().Peers()) >= 3 &&
		len(h2.Peerstore().Peers()) >= 3 &&
		len(h3.Peerstore().Peers()) >= 3 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h1.ID())

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h2.ID())

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h3.ID())

//Manually adding h4 as it is not in the same pool
h1.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
//Manually adding h4 as it is not in the same pool
h2.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
//Manually adding h4 as it is not in the same pool
h3.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}

// Wait until the fourth node discover others
for {
	if len(h4.Peerstore().Peers()) >= 4 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

//Store a link in h1 and find providers from h2

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n2RootLink, n2leafLink)

//n1.UpdateDhtPeers(h2.Peerstore().Peers())
//n2.UpdateDhtPeers(h1.Peerstore().Peers())

err = n1.ProvideLinkByDht(n1RootLink)
if err != nil {
	fmt.Print("Error happened in ProvideLinkByDht")
	panic(err)
}
peerlist1, err := n2.FindLinkProvidersByDht(n1RootLink)
if err != nil {
	fmt.Print("Error happened in FindLinkProvidersByDht")
	panic(err)
}
// Iterate over the slice and print the peer ID of each AddrInfo
for _, addrInfo := range peerlist1 {
	fmt.Printf("Found %s on %s\n", n1RootLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string
}

err = n1.PingDht(h3.ID())
if err != nil {
	fmt.Print("Error happened in PingDht")
	panic(err)
}

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2leafLink); err != nil {
	panic(err)
}

peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink)
if err != nil {
	fmt.Print("Error happened in FindLinkProvidersByDht3")
	panic(err)
}

// Iterate over the slice and print the peer ID of each AddrInfo
for _, addrInfo := range peerlist3 {
	fmt.Printf("Found %s on %s", n2leafLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string
}
Output:

Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
Instantiated node in pool 0 with ID: 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q
Finally 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains >=3 nodes:
Finally 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains >=3 nodes:
Finally 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains >=3 nodes:
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyreiekrzm6lpylw7hhrgvmzqfsek7og6ucqgpzns3ysbc5wj3imfrsge
    leaf:bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4
Found bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
exchanging by Pull...
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42}
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"this":true}
exchanging by Push...
Found bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4 on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM

Index

Examples

Constants

View Source
const (
	FxExchangeProtocolID = "/fx.land/exchange/0.0.1"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConfigUpdater added in v1.14.0

type ConfigUpdater func([]peer.ID) error

type Exchange

type Exchange interface {
	Start(context.Context) error
	Push(context.Context, peer.ID, ipld.Link) error
	Pull(context.Context, peer.ID, ipld.Link) error
	SetAuth(context.Context, peer.ID, peer.ID, bool) error
	Shutdown(context.Context) error
	IpniNotifyLink(link ipld.Link)
}

type FxExchange added in v0.5.4

type FxExchange struct {
	// contains filtered or unexported fields
}

func NewFxExchange added in v0.5.4

func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, error)

func (*FxExchange) FindProvidersDht added in v1.16.0

func (e *FxExchange) FindProvidersDht(l ipld.Link) ([]peer.AddrInfo, error)

func (*FxExchange) GetAuth added in v1.0.0

func (e *FxExchange) GetAuth(ctx context.Context) (peer.ID, error)

func (*FxExchange) GetAuthorizedPeers added in v1.14.0

func (e *FxExchange) GetAuthorizedPeers(ctx context.Context) ([]peer.ID, error)
func (e *FxExchange) IpniNotifyLink(link ipld.Link)

func (*FxExchange) PingDht added in v1.16.0

func (e *FxExchange) PingDht(p peer.ID) error

func (*FxExchange) ProvideDht added in v1.16.0

func (e *FxExchange) ProvideDht(l ipld.Link) error

func (*FxExchange) Pull added in v0.5.4

func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error

func (*FxExchange) Push added in v0.5.4

func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error

func (*FxExchange) PutValueDht added in v1.16.0

func (e *FxExchange) PutValueDht(ctx context.Context, key string, val string) error

func (*FxExchange) SearchValueDht added in v1.16.0

func (e *FxExchange) SearchValueDht(ctx context.Context, key string) (string, error)

func (*FxExchange) SetAuth added in v0.8.3

func (e *FxExchange) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow bool) error

func (*FxExchange) Shutdown added in v0.5.4

func (e *FxExchange) Shutdown(ctx context.Context) error

func (*FxExchange) Start added in v0.5.4

func (e *FxExchange) Start(ctx context.Context) error

func (*FxExchange) UpdateDhtPeers added in v1.16.0

func (e *FxExchange) UpdateDhtPeers(peers []peer.ID) error

type NoopExchange added in v0.5.4

type NoopExchange struct{}
func (n NoopExchange) IpniNotifyLink(l ipld.Link)

func (NoopExchange) Pull added in v0.5.4

func (n NoopExchange) Pull(_ context.Context, from peer.ID, l ipld.Link) error

func (NoopExchange) Push added in v0.5.4

func (n NoopExchange) Push(_ context.Context, to peer.ID, l ipld.Link) error

func (NoopExchange) SetAuth added in v0.8.3

func (n NoopExchange) SetAuth(_ context.Context, on peer.ID, subject peer.ID, allow bool) error

func (NoopExchange) Shutdown added in v0.5.4

func (n NoopExchange) Shutdown(context.Context) error

func (NoopExchange) Start added in v0.5.4

func (n NoopExchange) Start(context.Context) error

type Option added in v0.8.3

type Option func(*options) error

func WithAllowTransientConnection added in v0.8.4

func WithAllowTransientConnection(t bool) Option

func WithAuthorizedPeers added in v1.14.0

func WithAuthorizedPeers(l []peer.ID) Option

func WithAuthorizer added in v0.8.3

func WithAuthorizer(a peer.ID) Option

WithAuthorizer sets the peer ID that has permission to configure DAG exchange authorization. Defaults to authorization disabled.

func WithDhtProviderOptions added in v1.16.0

func WithDhtProviderOptions(d ...dht.Option) Option

func WithIPFS added in v1.21.0

func WithIPFS(ipfsApi iface.CoreAPI) Option

func WithIpniProviderEngineOptions added in v1.0.0

func WithIpniProviderEngineOptions(e ...engine.Option) Option

func WithIpniPublishChanBuffer added in v1.0.0

func WithIpniPublishChanBuffer(s int) Option

func WithIpniPublishDisabled added in v1.0.0

func WithIpniPublishDisabled(d bool) Option

func WithIpniPublishInterval added in v1.0.0

func WithIpniPublishInterval(t time.Duration) Option

func WithIpniPublishMaxBatchSize added in v1.0.0

func WithIpniPublishMaxBatchSize(s int) Option

func WithUpdateConfig added in v1.14.0

func WithUpdateConfig(updateConfig ConfigUpdater) Option

func WithWg added in v1.21.0

func WithWg(wg *sync.WaitGroup) Option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL