diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 96c7d573..2b6a211b 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -8,10 +8,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: ./.github/actions/local-s3 - name: Set up JDK 11 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: temurin java-version: 11 diff --git a/README.md b/README.md index 9ef6349c..fadf89d1 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ Currently implemented properties: * bloom/[infini filtered](https://www.rasmuspagh.net/papers/infinifilter.pdf) blockstore * connect bitswap to kademlia for discovery, with a faster version with supplied peerids * configurable cid publishing function +* mDNS peer discovery * Android compatibility * example serverless chat app using p2p http proxy for Android * interop tests with other implementations - https://github.com/libp2p/test-plans/ @@ -37,7 +38,6 @@ In the future we will add: * circuit-relay * dcutr (direct connection upgrade through relay) * AutoRelay -* mDNS peer discovery * example iOS chat app * QUIC transport (and encryption and multiplexing) diff --git a/pom.xml b/pom.xml index 4bcfd4b4..19584575 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.peergos nabu - v0.7.9 + v0.8.0 UTF-8 @@ -16,7 +16,7 @@ 2.2 v1.4.12 v1.4.4 - 1.9.10 + 2.1.0 @@ -40,35 +40,31 @@ - - org.jetbrains.kotlin - kotlin-maven-plugin - ${kotlin.version} - true - org.apache.maven.plugins - maven-dependency-plugin - 2.6 + maven-assembly-plugin + 3.1.1 + + + + jar-with-dependencies + + + - unpack-dependencies + make-assembly package - unpack-dependencies + single - - system - META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA - ${project.build.directory}/classes - - org.apache.maven.plugins - maven-compiler-plugin - 3.10.1 + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} compile @@ -76,60 +72,51 @@ compile + + + src/main/java + target/generated-sources/annotations + + - testCompile + test-compile test-compile - testCompile + test-compile + + ${maven.compiler.target} + org.apache.maven.plugins - maven-assembly-plugin - 3.1.1 - - - - jar-with-dependencies - - - + maven-compiler-plugin + 3.10.1 - make-assembly - package + compile + compile - single + compile - - - - org.apache.maven.plugins - maven-shade-plugin - 3.3.0 - - package + testCompile + test-compile - shade + testCompile - - false - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - + + + default-compile + none + + + default-testCompile + none @@ -139,7 +126,7 @@ org.jetbrains.kotlin - kotlin-stdlib + kotlin-stdlib-jdk8 ${kotlin.version} @@ -162,7 +149,7 @@ com.github.peergos jvm-libp2p - 0.16.6 + 0.18.0-ipv6-mdns-wildcard org.slf4j diff --git a/src/main/java/org/peergos/EmbeddedIpfs.java b/src/main/java/org/peergos/EmbeddedIpfs.java index 1050b8b6..83bf987b 100644 --- a/src/main/java/org/peergos/EmbeddedIpfs.java +++ b/src/main/java/org/peergos/EmbeddedIpfs.java @@ -1,40 +1,64 @@ package org.peergos; -import io.ipfs.cid.*; -import io.ipfs.multiaddr.*; +import io.ipfs.cid.Cid; +import io.ipfs.multiaddr.MultiAddress; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import io.libp2p.core.crypto.*; -import io.libp2p.core.multiformats.*; -import io.libp2p.core.multistream.*; -import io.libp2p.protocol.*; -import org.peergos.blockstore.*; +import io.libp2p.core.AddressBook; +import io.libp2p.core.Host; +import io.libp2p.core.PeerId; +import io.libp2p.core.crypto.PrivKey; +import io.libp2p.core.crypto.PubKey; +import io.libp2p.core.multiformats.Multiaddr; +import io.libp2p.core.multistream.ProtocolBinding; +import io.libp2p.discovery.MDnsDiscovery; +import io.libp2p.protocol.Ping; +import org.peergos.blockstore.Blockstore; +import org.peergos.blockstore.FileBlockstore; +import org.peergos.blockstore.FilteredBlockstore; +import org.peergos.blockstore.ProvidingBlockstore; +import org.peergos.blockstore.TypeLimitedBlockstore; import org.peergos.blockstore.metadatadb.BlockMetadataStore; -import org.peergos.blockstore.metadatadb.JdbcBlockMetadataStore; import org.peergos.blockstore.metadatadb.CachingBlockMetadataStore; +import org.peergos.blockstore.metadatadb.JdbcBlockMetadataStore; import org.peergos.blockstore.metadatadb.sql.H2BlockMetadataCommands; import org.peergos.blockstore.metadatadb.sql.UncloseableConnection; import org.peergos.blockstore.s3.S3Blockstore; -import org.peergos.config.*; +import org.peergos.config.Config; +import org.peergos.config.FilterType; +import org.peergos.config.IdentitySection; import org.peergos.net.ConnectionException; -import org.peergos.protocol.*; -import org.peergos.protocol.autonat.*; -import org.peergos.protocol.bitswap.*; -import org.peergos.protocol.circuit.*; -import org.peergos.protocol.dht.*; -import org.peergos.protocol.http.*; -import org.peergos.protocol.ipns.*; +import org.peergos.protocol.IdentifyBuilder; +import org.peergos.protocol.autonat.AutonatProtocol; +import org.peergos.protocol.bitswap.Bitswap; +import org.peergos.protocol.bitswap.BitswapEngine; +import org.peergos.protocol.circuit.CircuitHopProtocol; +import org.peergos.protocol.circuit.CircuitStopProtocol; +import org.peergos.protocol.dht.Kademlia; +import org.peergos.protocol.dht.KademliaController; +import org.peergos.protocol.dht.KademliaEngine; +import org.peergos.protocol.dht.ProviderStore; +import org.peergos.protocol.dht.RamProviderStore; +import org.peergos.protocol.dht.RecordStore; +import org.peergos.protocol.http.HttpProtocol; +import org.peergos.protocol.ipns.IPNS; +import org.peergos.protocol.ipns.IpnsRecord; import org.peergos.util.Logging; -import java.nio.file.*; +import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.time.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.logging.*; -import java.util.stream.*; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; +import java.util.logging.Logger; +import java.util.stream.Collectors; public class EmbeddedIpfs { private static final Logger LOG = Logging.LOG(); @@ -49,6 +73,8 @@ public class EmbeddedIpfs { public final Optional p2pHttp; private final List bootstrap; private final Optional blockProvider; + private final List announce; + private final List mdns = new ArrayList<>(); public EmbeddedIpfs(Host node, Blockstore blockstore, @@ -57,7 +83,8 @@ public EmbeddedIpfs(Host node, Bitswap bitswap, Optional p2pHttp, List bootstrap, - Optional> newBlockProvider) { + Optional> newBlockProvider, + List announce) { this.node = node; this.blockstore = blockstore; this.records = records; @@ -68,6 +95,7 @@ public EmbeddedIpfs(Host node, this.blocks = new BitswapBlockService(node, bitswap, dht); this.blockProvider = newBlockProvider.map(q -> new PeriodicBlockProvider(22 * 3600_000L, () -> blockstore.refs(false).join().stream(), node, dht, q)); + this.announce = announce; } public int maxBlockSize() { @@ -138,12 +166,15 @@ public void start() { try { this.stop().join(); } catch (Exception ex) { - ex.printStackTrace(); + LOG.info(ex.getMessage()); } }); Runtime.getRuntime().addShutdownHook(shutdownHook); node.start().join(); - IdentifyBuilder.addIdentifyProtocol(node); + IdentifyBuilder.addIdentifyProtocol(node, announce.stream() + .map(MultiAddress::toString) + .map(Multiaddr::new) + .collect(Collectors.toList())); LOG.info("Node started and listening on " + node.listenAddresses()); LOG.info("Bootstrapping IPFS routing table"); if (bootstrap.isEmpty()) @@ -153,15 +184,34 @@ public void start() { dht.bootstrap(node); dht.startBootstrapThread(node); - blockProvider.ifPresent(p -> p.start()); + LOG.info("MDNS discovery enabled"); + MDnsDiscovery mdns = new MDnsDiscovery(node, "_ipfs-discovery._udp.local.", 60, null); + this.mdns.add(mdns); + mdns.addHandler(peerInfo -> { + PeerId remote = PeerId.fromBase58(peerInfo.getPeerId().toBase58().substring(1)); // Not sure what's wrong with peerInfo, but this works + if (!remote.equals(node.getPeerId())) { + LOG.info(node.getPeerId() + " found local peer: " + remote.toBase58() + ", addrs: " + peerInfo.getAddresses()); + Multiaddr[] remoteAddrs = peerInfo.getAddresses().toArray(new Multiaddr[0]); + KademliaController ctr = dht.dial(node, remote, remoteAddrs).getController().join(); + ctr.closerPeers(node.getPeerId().getBytes()).join(); + node.getAddressBook().addAddrs(remote, 0, remoteAddrs); + } + return null; + }); + mdns.start(); + + blockProvider.ifPresent(PeriodicBlockProvider::start); } public CompletableFuture stop() throws Exception { if (records != null) { records.close(); } - blockProvider.ifPresent(b -> b.stop()); + blockProvider.ifPresent(PeriodicBlockProvider::stop); dht.stopBootstrapThread(); + for (MDnsDiscovery m : mdns) { + m.stop(); + } return node != null ? node.stop() : CompletableFuture.completedFuture(null); } @@ -179,6 +229,7 @@ public static BlockMetadataStore buildBlockMetadata(Args a) { throw new RuntimeException(e); } } + public static Blockstore buildBlockStore(Config config, Path ipfsPath, BlockMetadataStore meta, boolean updateMetadb) { Blockstore withMetadb; if (config.datastore.blockMount.prefix.equals("flatfs.datastore")) { @@ -206,9 +257,9 @@ public static Blockstore typeLimited(Blockstore blocks, Config config) { public static Blockstore filteredBlockStore(Blockstore blocks, Config config) { if (config.datastore.filter.type == FilterType.BLOOM) { return FilteredBlockstore.bloomBased(blocks, config.datastore.filter.falsePositiveRate); - } else if(config.datastore.filter.type == FilterType.INFINI) { + } else if (config.datastore.filter.type == FilterType.INFINI) { return FilteredBlockstore.infiniBased(blocks, config.datastore.filter.falsePositiveRate); - } else if(config.datastore.filter.type == FilterType.NONE) { + } else if (config.datastore.filter.type == FilterType.NONE) { return blocks; } else { throw new IllegalStateException("Unhandled filter type: " + config.datastore.filter.type); @@ -224,8 +275,8 @@ public static EmbeddedIpfs build(RecordStore records, BlockRequestAuthoriser authoriser, Optional handler, boolean localEnabled) { - return build(records, blocks, provideBlocks, swarmAddresses, bootstrap, identity, authoriser, handler, localEnabled, - Optional.empty(), Optional.empty()); + return build(records, blocks, provideBlocks, swarmAddresses, bootstrap, identity, Collections.emptyList(), + authoriser, handler, localEnabled, Optional.empty(), Optional.empty()); } public static EmbeddedIpfs build(RecordStore records, @@ -234,6 +285,7 @@ public static EmbeddedIpfs build(RecordStore records, List swarmAddresses, List bootstrap, IdentitySection identity, + List announce, BlockRequestAuthoriser authoriser, Optional handler, boolean localEnabled, @@ -245,7 +297,7 @@ public static EmbeddedIpfs build(RecordStore records, ProviderStore providers = new RamProviderStore(10_000); HostBuilder builder = new HostBuilder().setIdentity(identity.privKeyProtobuf).listen(swarmAddresses); - if (! builder.getPeerId().equals(identity.peerId)) { + if (!builder.getPeerId().equals(identity.peerId)) { throw new IllegalStateException("PeerId invalid"); } Multihash ourPeerId = Multihash.deserialize(builder.getPeerId().getBytes()); @@ -268,9 +320,9 @@ public static EmbeddedIpfs build(RecordStore records, Host node = builder.addProtocols(protocols).build(); Optional> newBlockProvider = provideBlocks ? - Optional.of(((ProvidingBlockstore)blockstore).toPublish) : + Optional.of(((ProvidingBlockstore) blockstore).toPublish) : Optional.empty(); - return new EmbeddedIpfs(node, blockstore, records, dht, bitswap, httpHandler, bootstrap, newBlockProvider); + return new EmbeddedIpfs(node, blockstore, records, dht, bitswap, httpHandler, bootstrap, newBlockProvider, announce); } public static Multiaddr[] getAddresses(Host node, Kademlia dht, Multihash targetNodeId) throws ConnectionException { diff --git a/src/main/java/org/peergos/HostBuilder.java b/src/main/java/org/peergos/HostBuilder.java index 58621917..7c19121c 100644 --- a/src/main/java/org/peergos/HostBuilder.java +++ b/src/main/java/org/peergos/HostBuilder.java @@ -1,25 +1,40 @@ package org.peergos; -import io.ipfs.multiaddr.*; +import io.ipfs.multiaddr.MultiAddress; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import io.libp2p.core.crypto.*; -import io.libp2p.core.dsl.*; -import io.libp2p.core.multiformats.*; -import io.libp2p.core.multistream.*; -import io.libp2p.core.mux.*; -import io.libp2p.crypto.keys.*; -import io.libp2p.protocol.*; -import io.libp2p.security.noise.*; -import io.libp2p.transport.tcp.*; +import io.libp2p.core.ConnectionHandler; +import io.libp2p.core.Host; +import io.libp2p.core.PeerId; +import io.libp2p.core.StreamPromise; import io.libp2p.core.crypto.KeyKt; -import org.peergos.blockstore.*; -import org.peergos.protocol.autonat.*; -import org.peergos.protocol.bitswap.*; -import org.peergos.protocol.circuit.*; -import org.peergos.protocol.dht.*; -import java.util.*; -import java.util.stream.*; +import io.libp2p.core.crypto.PrivKey; +import io.libp2p.core.dsl.Builder; +import io.libp2p.core.dsl.BuilderJKt; +import io.libp2p.core.multiformats.Multiaddr; +import io.libp2p.core.multistream.ProtocolBinding; +import io.libp2p.core.mux.StreamMuxerProtocol; +import io.libp2p.crypto.keys.Ed25519Kt; +import io.libp2p.protocol.IdentifyBinding; +import io.libp2p.protocol.IdentifyController; +import io.libp2p.protocol.IdentifyProtocol; +import io.libp2p.protocol.Ping; +import io.libp2p.security.noise.NoiseXXSecureChannel; +import io.libp2p.transport.tcp.TcpTransport; +import org.peergos.blockstore.Blockstore; +import org.peergos.protocol.autonat.AutonatProtocol; +import org.peergos.protocol.bitswap.Bitswap; +import org.peergos.protocol.bitswap.BitswapEngine; +import org.peergos.protocol.circuit.CircuitHopProtocol; +import org.peergos.protocol.circuit.CircuitStopProtocol; +import org.peergos.protocol.dht.Kademlia; +import org.peergos.protocol.dht.KademliaEngine; +import org.peergos.protocol.dht.ProviderStore; +import org.peergos.protocol.dht.RecordStore; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; public class HostBuilder { private PrivKey privKey; @@ -46,21 +61,21 @@ public List getProtocols() { public Optional getWanDht() { return protocols.stream() .filter(p -> p instanceof Kademlia && p.getProtocolDescriptor().getAnnounceProtocols().contains("/ipfs/kad/1.0.0")) - .map(p -> (Kademlia)p) + .map(p -> (Kademlia) p) .findFirst(); } public Optional getBitswap() { return protocols.stream() .filter(p -> p instanceof Bitswap) - .map(p -> (Bitswap)p) + .map(p -> (Bitswap) p) .findFirst(); } public Optional getRelayHop() { return protocols.stream() .filter(p -> p instanceof CircuitHopProtocol.Binding) - .map(p -> (CircuitHopProtocol.Binding)p) + .map(p -> (CircuitHopProtocol.Binding) p) .findFirst(); } @@ -93,7 +108,6 @@ public HostBuilder setIdentity(byte[] privKey) { } - public HostBuilder setPrivKey(PrivKey privKey) { this.privKey = privKey; this.peerId = PeerId.fromPubKey(privKey.publicKey()); @@ -182,7 +196,7 @@ public static Host build(PrivKey privKey, if (connection.isInitiator()) return; addrs.getAddrs(remotePeer).thenAccept(existing -> { - if (! existing.isEmpty()) + if (!existing.isEmpty()) return; StreamPromise stream = connection.muxerSession() .createStream(new IdentifyBinding(new IdentifyProtocol())); @@ -217,7 +231,7 @@ public static Host build(PrivKey privKey, }); for (ProtocolBinding protocol : protocols) { if (protocol instanceof HostConsumer) - ((HostConsumer)protocol).setHost(host); + ((HostConsumer) protocol).setHost(host); } return host; } diff --git a/src/main/java/org/peergos/IpnsPublisher.java b/src/main/java/org/peergos/IpnsPublisher.java index 77f5b3d7..0f9aeacd 100644 --- a/src/main/java/org/peergos/IpnsPublisher.java +++ b/src/main/java/org/peergos/IpnsPublisher.java @@ -1,27 +1,39 @@ package org.peergos; -import io.ipfs.multiaddr.*; -import io.ipfs.multihash.*; -import io.libp2p.core.*; -import io.libp2p.core.crypto.*; -import io.libp2p.crypto.keys.*; -import org.peergos.blockstore.*; -import org.peergos.config.*; -import org.peergos.protocol.dht.*; -import org.peergos.protocol.ipns.*; -import org.peergos.util.*; +import io.ipfs.multiaddr.MultiAddress; +import io.ipfs.multihash.Multihash; +import io.libp2p.core.PeerId; +import io.libp2p.core.crypto.KeyKt; +import io.libp2p.core.crypto.PrivKey; +import io.libp2p.crypto.keys.Ed25519Kt; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.config.Config; +import org.peergos.config.IdentitySection; +import org.peergos.protocol.dht.RamRecordStore; +import org.peergos.protocol.ipns.IPNS; +import org.peergos.protocol.ipns.IpnsRecord; +import org.peergos.util.ArrayOps; -import java.io.*; -import java.nio.file.*; -import java.time.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.stream.*; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; public class IpnsPublisher { private static final ExecutorService ioExec = Executors.newFixedThreadPool(50); + public static void main(String[] a) throws Exception { Path publishFile = Paths.get("publishers.txt"); EmbeddedIpfs publisher = startIpfs(); @@ -36,7 +48,7 @@ public static void main(String[] a) throws Exception { .collect(Collectors.toList()); System.out.println("Resolving " + records.size() + " keys"); - for (int c=0; c < 100; c++) { + for (int c = 0; c < 100; c++) { long t0 = System.currentTimeMillis(); List recordCounts = resolveAndRepublish(records, resolver, publisher); Path resultsFile = Paths.get("publish-resolve-counts-" + LocalDateTime.now().withNano(0) + ".txt"); @@ -75,7 +87,7 @@ public static void main(String[] a) throws Exception { System.out.println("published " + done + " / " + keycount); } long t1 = System.currentTimeMillis(); - System.out.println("Published all in " + (t1-t0)/1000 + "s"); + System.out.println("Published all in " + (t1 - t0) / 1000 + "s"); publisher.stop().join(); } System.exit(0); diff --git a/src/main/java/org/peergos/PeerAddresses.java b/src/main/java/org/peergos/PeerAddresses.java index f2afb379..c2efe8c8 100644 --- a/src/main/java/org/peergos/PeerAddresses.java +++ b/src/main/java/org/peergos/PeerAddresses.java @@ -15,6 +15,7 @@ import java.util.*; import java.util.function.*; import java.util.stream.*; +import java.util.stream.Stream; public class PeerAddresses { public final Multihash peerId; @@ -57,7 +58,13 @@ public static PeerAddresses fromProtobuf(Dht.Message.Peer peer) { Multihash peerId = Multihash.deserialize(peer.getId().toByteArray()); List addrs = peer.getAddrsList() .stream() - .map(b -> Multiaddr.deserialize(b.toByteArray())) + .flatMap(b -> { + try { + return Stream.of(Multiaddr.deserialize(b.toByteArray())); + } catch (Exception e) { + return Stream.empty(); + } + }) .collect(Collectors.toList()); return new PeerAddresses(peerId, addrs); } diff --git a/src/main/java/org/peergos/RamAddressBook.java b/src/main/java/org/peergos/RamAddressBook.java index b871d27b..8de76b9f 100644 --- a/src/main/java/org/peergos/RamAddressBook.java +++ b/src/main/java/org/peergos/RamAddressBook.java @@ -1,19 +1,28 @@ package org.peergos; -import io.libp2p.core.*; -import io.libp2p.core.multiformats.*; -import org.jetbrains.annotations.*; -import org.peergos.util.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.*; +import io.libp2p.core.AddressBook; +import io.libp2p.core.PeerId; +import io.libp2p.core.multiformats.Multiaddr; +import io.libp2p.core.multiformats.MultiaddrComponent; +import io.libp2p.core.multiformats.Protocol; +import org.jetbrains.annotations.NotNull; +import org.peergos.util.LRUCache; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import java.util.stream.Stream; public class RamAddressBook implements AddressBook { - // TODO: Does this need to be public? - public Map> addresses; + private final Map> addresses; public RamAddressBook() { addresses = Collections.synchronizedMap(new LRUCache<>(10_000)); diff --git a/src/main/java/org/peergos/blockstore/CidInfiniFilter.java b/src/main/java/org/peergos/blockstore/CidInfiniFilter.java index 0d6327be..4ac10e83 100644 --- a/src/main/java/org/peergos/blockstore/CidInfiniFilter.java +++ b/src/main/java/org/peergos/blockstore/CidInfiniFilter.java @@ -35,13 +35,18 @@ public static CidInfiniFilter build(Blockstore bs) { public static CidInfiniFilter build(Blockstore bs, double falsePositiveRate) { List refs = bs.refs(false).join(); int nBlocks = refs.size()*5/4; // increase by 25% to avoid expansion during build + CidInfiniFilter infini = build(nBlocks, falsePositiveRate); + refs.forEach(c -> infini.add(c)); + return infini; + } + + public static CidInfiniFilter build(int nBlocks, double falsePositiveRate) { int nextPowerOfTwo = Math.max(17, (int) (1 + Math.log(nBlocks) / Math.log(2))); double expansionAlpha = 0.8; int bitsPerEntry = (int)(4 - Math.log(falsePositiveRate / expansionAlpha) / Math.log(2) + 1); LOG.info("Using infini filter of initial size " + ((double)(bitsPerEntry * (1 << nextPowerOfTwo) / 8) / 1024 / 1024) + " MiB"); ChainedInfiniFilter infini = new ChainedInfiniFilter(nextPowerOfTwo, bitsPerEntry); infini.set_expand_autonomously(true); - refs.forEach(c -> infini.insert(c.toBytes(), true)); return new CidInfiniFilter(infini); } } diff --git a/src/main/java/org/peergos/blockstore/FilteredBlockstore.java b/src/main/java/org/peergos/blockstore/FilteredBlockstore.java index ab58a2bd..e2ef0adc 100644 --- a/src/main/java/org/peergos/blockstore/FilteredBlockstore.java +++ b/src/main/java/org/peergos/blockstore/FilteredBlockstore.java @@ -8,27 +8,48 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.*; public class FilteredBlockstore implements Blockstore { private final Blockstore blocks; - private final Filter filter; + private final Filter present; + private volatile Filter absent; + private final AtomicLong absentCount = new AtomicLong(0); - public FilteredBlockstore(Blockstore blocks, Filter filter) { + public FilteredBlockstore(Blockstore blocks, Filter present) { this.blocks = blocks; - this.filter = filter; + this.present = present; + this.absent = buildAbsentFilter(); } public CompletableFuture bloomAdd(Cid cid) { - filter.add(cid); + present.add(cid); return CompletableFuture.completedFuture(true); } + private static Filter buildAbsentFilter() { + return CidInfiniFilter.build(1_000, 0.001); + } + + private void addAbsentBlock(Cid c) { + if (absentCount.get() > 10_000) { + absentCount.set(0); + absent = buildAbsentFilter(); + } + absentCount.incrementAndGet(); + absent.add(c); + } + @Override public CompletableFuture has(Cid c) { - if (filter.has(c)) - return blocks.has(c); + if (present.has(c) && ! absent.has(c)) + return blocks.has(c).thenApply(res -> { + if (! res) + addAbsentBlock(c); + return res; + }); return CompletableFuture.completedFuture(false); } @@ -40,15 +61,19 @@ public CompletableFuture hasAny(Multihash h) { @Override public CompletableFuture> get(Cid c) { - if (filter.has(c)) - return blocks.get(c); + if (present.has(c) && ! absent.has(c)) { + return blocks.get(c).exceptionally(t -> { + addAbsentBlock(c); + return Optional.empty(); + }); + } return CompletableFuture.completedFuture(Optional.empty()); } @Override public CompletableFuture put(byte[] block, Cid.Codec codec) { return blocks.put(block, codec) - .thenApply(filter::add); + .thenApply(present::add); } @Override diff --git a/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java b/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java index aff3b92d..e80a746a 100644 --- a/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java +++ b/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java @@ -10,16 +10,27 @@ import org.peergos.blockstore.metadatadb.BlockMetadata; import org.peergos.blockstore.metadatadb.BlockMetadataStore; import org.peergos.cbor.CborObject; +import org.peergos.util.ArrayOps; +import org.peergos.util.Futures; import org.peergos.util.Hasher; -import org.peergos.util.*; +import org.peergos.util.HttpUtil; +import org.peergos.util.Logging; +import org.peergos.util.Pair; import javax.net.ssl.SSLException; -import java.io.*; +import java.io.FileNotFoundException; +import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.file.Files; import java.nio.file.Path; import java.time.ZonedDateTime; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; @@ -28,7 +39,9 @@ import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; -import java.util.stream.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; public class S3Blockstore implements Blockstore { @@ -130,13 +143,16 @@ public S3Blockstore(Map params, BlockMetadataStore blockMetadata hasher = new Hasher(); LOG.info("Using S3BlockStore"); } + private String getHost() { return bucket + "." + regionEndpoint; } + private boolean useHttps() { String host = getHost(); - return ! host.endsWith("localhost") && ! host.contains("localhost:"); + return !host.endsWith("localhost") && !host.contains("localhost:"); } + private Map readKeysFromAwsConfig() { String filePath = System.getenv("HOME") + "/" + ".aws/credentials"; LOG.info("Reading [default] config from: " + filePath); @@ -146,19 +162,19 @@ private Map readKeysFromAwsConfig() { boolean foundDefaultSection = false; boolean foundAccessKey = false; boolean foundSecretKey = false; - for(String line : lines) { + for (String line : lines) { String trimmedLine = line.trim(); - if(!foundDefaultSection) { - if (trimmedLine.equals("[default]")){ + if (!foundDefaultSection) { + if (trimmedLine.equals("[default]")) { foundDefaultSection = true; } } else { if (trimmedLine.startsWith("[")) { throw new IllegalStateException("Unable to read expected fields in: " + filePath); - } else if(trimmedLine.startsWith("aws_access_key_id")) { + } else if (trimmedLine.startsWith("aws_access_key_id")) { params.put("aws_access_key_id", extractParamValue(trimmedLine)); foundAccessKey = true; - } else if(trimmedLine.startsWith("aws_secret_access_key")) { + } else if (trimmedLine.startsWith("aws_secret_access_key")) { params.put("aws_secret_access_key", extractParamValue(trimmedLine)); foundSecretKey = true; } @@ -175,10 +191,11 @@ private Map readKeysFromAwsConfig() { throw new IllegalStateException("Unable to find aws_secret_access_key"); } } catch (IOException ioe) { - throw new IllegalStateException("Unable to read: " + filePath, ioe); + throw new IllegalStateException("Unable to read: " + filePath, ioe); } return params; } + private String extractParamValue(String line) { int equalsIndex = line.indexOf("="); if (equalsIndex == -1) { @@ -186,16 +203,18 @@ private String extractParamValue(String line) { } return line.substring(equalsIndex + 1).trim(); } + private String getParam(Map params, String key) { if (params.containsKey(key)) { - return ((String )params.get(key)).trim(); + return ((String) params.get(key)).trim(); } else { throw new IllegalStateException("Expecting param: " + key); } } + private String getParam(Map params, String key, String defaultValue) { if (params.containsKey(key)) { - return ((String )params.get(key)).trim(); + return ((String) params.get(key)).trim(); } else { return defaultValue; } @@ -233,7 +252,7 @@ public void updateMetadataStoreIfEmpty() { ForkJoinPool pool = new ForkJoinPool(updateParallelism); int batchSize = all.size() / updateParallelism; AtomicLong progress = new AtomicLong(0); - int tenth = batchSize/10; + int tenth = batchSize / 10; List>> futures = IntStream.range(0, updateParallelism) .mapToObj(b -> pool.submit(() -> IntStream.range(b * batchSize, (b + 1) * batchSize) @@ -256,13 +275,14 @@ public void updateMetadataStoreIfEmpty() { private static V getWithBackoff(Supplier req) { long sleep = 100; - for (int i=0; i < 20; i++) { + for (int i = 0; i < 20; i++) { try { return req.get(); } catch (RateLimitException e) { try { Thread.sleep(sleep); - } catch (InterruptedException f) {} + } catch (InterruptedException f) { + } sleep *= 2; } } @@ -294,7 +314,7 @@ private CompletableFuture> getSizeWithoutRetry(Cid cid) { Map> headRes = HttpUtil.head(headUrl.base, headUrl.fields); blockHeads.inc(); long size = Long.parseLong(headRes.get("Content-Length").get(0)); - return Futures.of(Optional.of((int)size)); + return Futures.of(Optional.of((int) size)); } catch (FileNotFoundException f) { LOG.warning("S3 404 error reading " + cid); return Futures.of(Optional.empty()); @@ -306,7 +326,7 @@ private CompletableFuture> getSizeWithoutRetry(Cid cid) { throw new RateLimitException(); } boolean notFound = msg.startsWith("NoSuchKey"); - if (! notFound) { + if (!notFound) { LOG.warning("S3 error reading " + cid); LOG.log(Level.WARNING, msg, e); } @@ -344,12 +364,12 @@ private CompletableFuture> getWithoutRetry(Cid cid) { throw new RateLimitException(); } boolean notFound = msg.startsWith("NoSuchKey"); - if (! notFound) { + if (!notFound) { LOG.warning("S3 error reading " + path); LOG.log(Level.WARNING, msg, e); } failedBlockGets.inc(); - throw new RuntimeException(e.getMessage(), e); + return Futures.errored(e); } finally { readTimer.observeDuration(); } @@ -359,6 +379,7 @@ private CompletableFuture> getWithoutRetry(Cid cid) { public CompletableFuture put(byte[] block, Cid.Codec codec) { return getWithBackoff(() -> putWithoutRetry(block, codec)); } + public CompletableFuture putWithoutRetry(byte[] block, Cid.Codec codec) { Histogram.Timer writeTimer = writeTimerLog.labels("write").startTimer(); byte[] hash = Hash.sha256(block); @@ -368,7 +389,7 @@ public CompletableFuture putWithoutRetry(byte[] block, Cid.Codec codec) { String s3Key = folder + key; Map extraHeaders = new TreeMap<>(); extraHeaders.put("Content-Type", "application/octet-stream"); - String contentHash = ArrayOps.bytesToHex(hash); + String contentHash = ArrayOps.bytesToHex(hash); PresignedUrl putUrl = S3Request.preSignPut(s3Key, block.length, contentHash, false, S3AdminRequests.asAwsDate(ZonedDateTime.now()), host, extraHeaders, region, accessKeyId, secretKey, useHttps, hasher).join(); HttpUtil.put(putUrl.base, putUrl.fields, block); diff --git a/src/main/java/org/peergos/net/APIHandler.java b/src/main/java/org/peergos/net/APIHandler.java index ad49f8f3..d4f225dd 100755 --- a/src/main/java/org/peergos/net/APIHandler.java +++ b/src/main/java/org/peergos/net/APIHandler.java @@ -1,25 +1,39 @@ package org.peergos.net; +import com.sun.net.httpserver.HttpExchange; import io.ipfs.cid.Cid; -import io.ipfs.multihash.*; +import io.ipfs.multihash.Multihash; import io.libp2p.core.PeerId; -import io.libp2p.crypto.keys.*; -import org.peergos.*; -import org.peergos.cbor.*; -import org.peergos.protocol.ipns.*; -import org.peergos.protocol.ipns.pb.*; -import org.peergos.util.*; -import com.sun.net.httpserver.HttpExchange; +import org.peergos.AggregatedMetrics; +import org.peergos.EmbeddedIpfs; +import org.peergos.HashedBlock; +import org.peergos.PeerAddresses; +import org.peergos.Want; +import org.peergos.cbor.CborObject; +import org.peergos.protocol.ipns.IpnsRecord; +import org.peergos.protocol.ipns.pb.Ipns; +import org.peergos.util.ArrayOps; +import org.peergos.util.HttpUtil; +import org.peergos.util.JSONParser; +import org.peergos.util.Logging; +import org.peergos.util.Version; -import java.io.*; -import java.nio.*; -import java.util.*; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.logging.Logger; -import java.util.stream.*; +import java.util.stream.Collectors; public class APIHandler extends Handler { public static final String API_URL = "/api/v0/"; - public static final Version CURRENT_VERSION = Version.parse("0.7.9"); + public static final Version CURRENT_VERSION = Version.parse("0.8.0"); private static final Logger LOG = Logging.LOG(); private static final boolean LOGGING = true; @@ -52,7 +66,7 @@ public void handleCallToAPI(HttpExchange httpExchange) { long t1 = System.currentTimeMillis(); String path = httpExchange.getRequestURI().getPath(); try { - if (! path.startsWith(API_URL)) + if (!path.startsWith(API_URL)) throw new IllegalStateException("Unsupported api version, required: " + API_URL); path = path.substring(API_URL.length()); // N.B. URI.getQuery() decodes the query string @@ -64,7 +78,7 @@ public void handleCallToAPI(HttpExchange httpExchange) { AggregatedMetrics.API_ID.inc(); PeerId peerId = ipfs.node.getPeerId(); Map res = new HashMap<>(); - res.put("ID", peerId.toBase58()); + res.put("ID", peerId.toBase58()); replyJson(httpExchange, JSONParser.toString(res)); break; } @@ -90,7 +104,7 @@ public void handleCallToAPI(HttpExchange httpExchange) { .map(a -> Boolean.parseBoolean(a.get(0))) .orElse(true); List block = ipfs.getBlocks(List.of(new Want(Cid.decode(args.get(0)), auth)), peers, addToBlockstore); - if (! block.isEmpty()) { + if (!block.isEmpty()) { replyBytes(httpExchange, block.get(0).block); } else { try { @@ -104,7 +118,7 @@ public void handleCallToAPI(HttpExchange httpExchange) { case BULK_STAT: { AggregatedMetrics.API_BLOCK_STAT_BULK.inc(); Map json = (Map) JSONParser.parse(new String(readFully(httpExchange.getRequestBody()))); - List wants = ((List>)json.get("wants")) + List wants = ((List>) json.get("wants")) .stream() .map(Want::fromJson) .collect(Collectors.toList()); @@ -135,7 +149,7 @@ public void handleCallToAPI(HttpExchange httpExchange) { case PUT: { // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-block-put AggregatedMetrics.API_BLOCK_PUT.inc(); List format = params.get("format"); - Optional formatOpt = format !=null && format.size() == 1 ? Optional.of(format.get(0)) : Optional.empty(); + Optional formatOpt = format != null && format.size() == 1 ? Optional.of(format.get(0)) : Optional.empty(); if (formatOpt.isEmpty()) { throw new APIException("argument \"format\" is required"); } @@ -184,7 +198,7 @@ public void handleCallToAPI(HttpExchange httpExchange) { case RM_BULK: { AggregatedMetrics.API_BLOCK_RM_BULK.inc(); Map json = (Map) JSONParser.parse(new String(readFully(httpExchange.getRequestBody()))); - List cids = ((List)json.get("cids")) + List cids = ((List) json.get("cids")) .stream() .map(Cid::decode) .collect(Collectors.toList()); @@ -213,7 +227,7 @@ public void handleCallToAPI(HttpExchange httpExchange) { } Optional auth = Optional.ofNullable(params.get("auth")).map(a -> a.get(0)); List block = ipfs.getBlocks(List.of(new Want(Cid.decode(args.get(0)), auth)), Collections.emptySet(), false); - if (! block.isEmpty()) { + if (!block.isEmpty()) { Map res = new HashMap<>(); res.put("Size", block.get(0).block.length); replyJson(httpExchange, JSONParser.toString(res)); @@ -296,7 +310,7 @@ public void handleCallToAPI(HttpExchange httpExchange) { throw new IllegalStateException("Couldn't resolve " + signer); IpnsRecord latest = records.get(records.size() - 1); Ipns.IpnsEntry entry = Ipns.IpnsEntry.parseFrom(ByteBuffer.wrap(latest.raw)); - Map res = new HashMap<>(); + Map res = new HashMap<>(); res.put("sig", ArrayOps.bytesToHex(entry.getSignatureV2().toByteArray())); res.put("data", ArrayOps.bytesToHex(entry.getData().toByteArray())); String json = JSONParser.toString(res); @@ -319,10 +333,10 @@ public void handleCallToAPI(HttpExchange httpExchange) { } private byte[] readFully(InputStream in) throws IOException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - byte[] b = new byte[0x1000]; + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + byte[] b = new byte[0x1000]; int nRead; - while ((nRead = in.read(b, 0, b.length)) != -1 ) + while ((nRead = in.read(b, 0, b.length)) != -1) bout.write(b, 0, nRead); in.close(); return bout.toByteArray(); diff --git a/src/main/java/org/peergos/protocol/IdentifyBuilder.java b/src/main/java/org/peergos/protocol/IdentifyBuilder.java index 402120c6..78a887f9 100644 --- a/src/main/java/org/peergos/protocol/IdentifyBuilder.java +++ b/src/main/java/org/peergos/protocol/IdentifyBuilder.java @@ -15,12 +15,12 @@ public class IdentifyBuilder { - public static void addIdentifyProtocol(Host node) { + public static void addIdentifyProtocol(Host node, List announceAddresses) { IdentifyOuterClass.Identify.Builder identifyBuilder = IdentifyOuterClass.Identify.newBuilder() .setProtocolVersion("ipfs/0.1.0") .setAgentVersion("nabu/v0.1.0") .setPublicKey(ByteArrayExtKt.toProtobuf(node.getPrivKey().publicKey().bytes())) - .addAllListenAddrs(node.listenAddresses().stream() + .addAllListenAddrs(Stream.concat(node.listenAddresses().stream(), announceAddresses.stream()) .flatMap(a -> expandWildcardAddresses(a).stream()) .map(Multiaddr::serialize) .map(ByteArrayExtKt::toProtobuf) diff --git a/src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java b/src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java index 3e9927c8..fb4ddb9c 100644 --- a/src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java +++ b/src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java @@ -158,7 +158,12 @@ public void receiveMessage(MessageOuterClass.Message msg, Stream source, Counter int presentBlocks = 0; if (msg.hasWantlist()) { for (MessageOuterClass.Message.Wantlist.Entry e : msg.getWantlist().getEntriesList()) { - Cid c = Cid.cast(e.getBlock().toByteArray()); + Cid c; + try { + c = Cid.cast(e.getBlock().toByteArray()); + } catch (Exception ex) { + continue; + } Optional auth = e.getAuth().isEmpty() ? Optional.empty() : Optional.of(ArrayOps.bytesToHex(e.getAuth().toByteArray())); boolean isCancel = e.getCancel(); boolean sendDontHave = e.getSendDontHave(); diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index 0e3eb9b7..89b8b039 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -1,27 +1,59 @@ package org.peergos.protocol.dht; -import com.offbynull.kademlia.*; -import io.ipfs.multiaddr.*; +import com.offbynull.kademlia.Id; +import io.ipfs.multiaddr.MultiAddress; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import io.libp2p.core.crypto.*; -import io.libp2p.core.multiformats.*; +import io.libp2p.core.AddressBook; +import io.libp2p.core.Connection; +import io.libp2p.core.ConnectionClosedException; +import io.libp2p.core.Host; +import io.libp2p.core.PeerId; +import io.libp2p.core.StreamPromise; +import io.libp2p.core.crypto.PrivKey; +import io.libp2p.core.multiformats.Multiaddr; import io.libp2p.core.multiformats.Protocol; -import io.libp2p.core.multistream.*; -import io.libp2p.etc.types.*; -import io.libp2p.protocol.*; -import org.peergos.*; -import org.peergos.protocol.dnsaddr.*; -import org.peergos.protocol.ipns.*; +import io.libp2p.core.multistream.StrictProtocolBinding; +import io.libp2p.etc.types.NonCompleteException; +import io.libp2p.etc.types.NothingToCompleteException; +import io.libp2p.protocol.Identify; +import io.libp2p.protocol.IdentifyController; +import org.peergos.AddressBookConsumer; +import org.peergos.ClientMode; +import org.peergos.Hash; +import org.peergos.PeerAddresses; +import org.peergos.Providers; +import org.peergos.protocol.dnsaddr.DnsAddr; +import org.peergos.protocol.ipns.GetResult; +import org.peergos.protocol.ipns.IPNS; +import org.peergos.protocol.ipns.IpnsMapping; +import org.peergos.protocol.ipns.IpnsRecord; import org.peergos.util.Logging; -import java.time.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.function.*; -import java.util.logging.*; -import java.util.stream.*; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; import java.util.stream.Stream; public class Kademlia extends StrictProtocolBinding implements AddressBookConsumer, ClientMode { @@ -82,12 +114,14 @@ public int bootstrapRoutingTable(Host host, List addrs, Predicate< try { future.get(5, TimeUnit.SECONDS); successes++; - } catch (Exception e) {} + } catch (Exception e) { + } } return successes; } private AtomicBoolean running = new AtomicBoolean(false); + public void startBootstrapThread(Host us) { running.set(true); new Thread(() -> { @@ -106,9 +140,15 @@ public void stopBootstrapThread() { running.set(false); } - public boolean connectTo(Host us, PeerAddresses peer) { + private boolean connectTo(Host us, PeerAddresses peer) { try { - new Identify().dial(us, PeerId.fromBase58(peer.peerId.toBase58()), getPublic(peer)).getController().join().id().join(); + PeerId ourPeerId = PeerId.fromBase58(peer.peerId.toBase58()); + StreamPromise conn = new Identify().dial(us, ourPeerId, getPublic(peer)); + try { + conn.getController().join().id().join(); + } finally { + conn.getStream().thenApply(s -> s.close()); + } return true; } catch (Exception e) { if (e.getCause() instanceof NothingToCompleteException || e.getCause() instanceof NonCompleteException) @@ -129,12 +169,19 @@ public void bootstrap(Host us) { // lookup our own peer id to keep our nearest neighbours up-to-date, // and connect to all of them, so they know about our addresses List closestToUs = findClosestPeers(Multihash.deserialize(us.getPeerId().getBytes()), replication, us); - int connectedClosest = 0; + List connected = new ArrayList<>(); for (PeerAddresses peer : closestToUs) { if (connectTo(us, peer)) - connectedClosest++; + connected.add(peer.peerId); + } + LOG.info("Bootstrap connected to " + connected.size() + " nodes close to us. " + connected.stream().map(Multihash::toString).sorted().limit(5).collect(Collectors.toList())); + + List allConns = us.getNetwork().getConnections(); + Set activeConns = us.getStreams().stream().map(s -> s.getConnection()).collect(Collectors.toSet()); + List toClose = allConns.stream().filter(c -> !activeConns.contains(c)).collect(Collectors.toList()); + for (Connection conn : toClose) { + conn.close(); } - LOG.fine("Bootstrap connected to " + connectedClosest + " nodes close to us."); } @Override @@ -142,6 +189,19 @@ public boolean isClient() { return this.clientMode; } + private static CompletableFuture closeAfter(CompletableFuture sf, Supplier> query) { + CompletableFuture res = new CompletableFuture<>(); + query.get().thenAccept(v -> { + sf.thenAccept(s -> s.close()); + res.complete(v); + }).exceptionally(t -> { + sf.thenAccept(s -> s.close()); + res.completeExceptionally(t); + return null; + }); + return res; + } + static class RoutingEntry { public final Id key; public final PeerAddresses addresses; @@ -170,6 +230,7 @@ public List findClosestPeers(Multihash peerIdkey, int maxCount, H byte[] key = peerIdkey.toBytes(); return findClosestPeers(key, maxCount, us); } + public List findClosestPeers(byte[] key, int maxCount, Host us) { Id keyId = Id.create(Hash.sha256(key), 256); SortedSet closest = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId))); @@ -191,6 +252,8 @@ public List findClosestPeers(byte[] key, int maxCount, Host us) { Set queried = Collections.synchronizedSet(new HashSet<>()); int queryParallelism = alpha; + if (toQuery.isEmpty()) + LOG.info("Couldn't find any local peers in kademlia routing table"); while (true) { // The set of next query candidates. Pick as many peers from the candidate peers (closest) as the alpha concurrency factor allows. List thisRound = toQuery.stream() @@ -226,12 +289,12 @@ public List findClosestPeers(byte[] key, int maxCount, Host us) { } } } catch (Exception e) { - // couldn't contact peer + // couldn't contact peer } } // if no new peers in top k were returned we are done - if (! foundCloser) + if (!foundCloser) break; } return closest.stream() @@ -261,12 +324,16 @@ public CompletableFuture> findProviders(Multihash block, Hos List> futures = queryThisRound.stream() .parallel() .map(r -> { - KademliaController res = null; + StreamPromise conn = null; try { - res = dialPeer(r.addresses, us).join(); - return res.getProviders(block).orTimeout(2, TimeUnit.SECONDS); - }catch (Exception e) { + conn = dialPeer(r.addresses, us); + return conn.getController().join() + .getProviders(block).orTimeout(2, TimeUnit.SECONDS); + } catch (Exception e) { return null; + } finally { + if (conn != null) + conn.getStream().thenApply(s -> s.close()); } }).filter(prov -> prov != null) .collect(Collectors.toList()); @@ -284,12 +351,12 @@ public CompletableFuture> findProviders(Multihash block, Hos } } } catch (Exception e) { - if (! (e.getCause() instanceof TimeoutException)) - LOG.fine( "Timeout Exception: " + e.getMessage()); + if (!(e.getCause() instanceof TimeoutException)) + LOG.fine("Timeout Exception: " + e.getMessage()); } } // if no new peers in top k were returned we are done - if (! foundCloser) + if (!foundCloser) break; } @@ -298,17 +365,19 @@ public CompletableFuture> findProviders(Multihash block, Hos private CompletableFuture> getCloserPeers(byte[] key, PeerAddresses target, Host us) { try { - return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(key); + StreamPromise conn = dialPeer(target, us); + KademliaController contr = conn.getController().orTimeout(2, TimeUnit.SECONDS).join(); + return closeAfter(conn.getStream(), () -> contr.closerPeers(key)); } catch (Exception e) { // we can't dial quic only nodes until it's implemented if (target.addresses.stream().allMatch(a -> a.toString().contains("quic"))) return CompletableFuture.completedFuture(Collections.emptyList()); if (e.getCause() instanceof NothingToCompleteException || e.getCause() instanceof NonCompleteException) { LOG.fine("Couldn't dial " + target.peerId + " addrs: " + target.addresses); - } else if (e.getCause() instanceof TimeoutException) + } else if (e.getCause() instanceof TimeoutException) LOG.fine("Timeout dialing " + target.peerId + " addrs: " + target.addresses); - else if (e.getCause() instanceof ConnectionClosedException) {} - else + else if (e.getCause() instanceof ConnectionClosedException) { + } else LOG.fine("Unknown error: " + e.getMessage()); } return CompletableFuture.completedFuture(Collections.emptyList()); @@ -320,26 +389,34 @@ private Multiaddr[] getPublic(PeerAddresses target) { .collect(Collectors.toList()).toArray(new Multiaddr[0]); } - private CompletableFuture dialPeer(PeerAddresses target, Host us) { + private StreamPromise dialPeer(PeerAddresses target, Host us) { Multiaddr[] multiaddrs = target.addresses.stream() .map(a -> Multiaddr.fromString(a.toString())) - .filter(a -> ! a.has(Protocol.DNS) && ! a.has(Protocol.DNS4) && ! a.has(Protocol.DNS6)) + .filter(a -> !a.has(Protocol.DNS) && !a.has(Protocol.DNS4) && !a.has(Protocol.DNS6)) .collect(Collectors.toList()).toArray(new Multiaddr[0]); - return dial(us, PeerId.fromBase58(target.peerId.toBase58()), multiaddrs).getController(); + return dial(us, PeerId.fromBase58(target.peerId.toBase58()), multiaddrs); } public CompletableFuture provideBlock(Multihash block, Host us, PeerAddresses ourAddrs) { List closestPeers = findClosestPeers(block, replication, us); List> provides = closestPeers.stream() .parallel() - .map(p -> dialPeer(p, us) - .thenCompose(contr -> contr.provide(block, ourAddrs)) - .exceptionally(t -> { - if (t.getCause() instanceof NonCompleteException) + .map(p -> { + StreamPromise conn = dialPeer(p, us); + return conn.getController() + .thenCompose(contr -> contr.provide(block, ourAddrs)) + .thenApply(res -> { + conn.getStream().thenApply(s -> s.close()); + return res; + }) + .exceptionally(t -> { + if (t.getCause() instanceof NonCompleteException) + return true; + LOG.log(Level.FINE, t, t::getMessage); + conn.getStream().thenApply(s -> s.close()); return true; - LOG.log(Level.FINE, t, t::getMessage); - return true; - })) + }); + }) .collect(Collectors.toList()); return CompletableFuture.allOf(provides.toArray(new CompletableFuture[0])); } @@ -362,14 +439,15 @@ private CompletableFuture putValue(Multihash publisher, PeerAddresses peer, Host us) { try { - return dialPeer(peer, us).join() + return dialPeer(peer, us).getController().join() .putValue(publisher, signedRecord); - } catch (Exception e) {} + } catch (Exception e) { + } return CompletableFuture.completedFuture(false); } private boolean hasTransportOverlap(PeerAddresses p) { - return p.addresses.stream().anyMatch(a -> a.has(Protocol.TCP) && ! a.has(Protocol.P2PCIRCUIT)); + return p.addresses.stream().anyMatch(a -> a.has(Protocol.TCP) && !a.has(Protocol.P2PCIRCUIT)); } public CompletableFuture publishValue(Multihash publisher, @@ -397,7 +475,7 @@ public CompletableFuture publishValue(Multihash publisher, .map(p -> new RoutingEntry(Id.create(Hash.sha256(p.peerId.toBytes()), 256), p)) .collect(Collectors.toList())); Set queried = Collections.synchronizedSet(new HashSet<>()); - while (! toQuery.isEmpty()) { + while (!toQuery.isEmpty()) { int remaining = toQuery.size() - 3; List thisRound = toQuery.stream() .filter(r -> hasTransportOverlap(r.addresses)) // don't waste time trying to dial nodes we can't @@ -410,7 +488,7 @@ public CompletableFuture publishValue(Multihash publisher, return CompletableFuture.supplyAsync(() -> getCloserPeers(key, r.addresses, us).thenApply(res -> { List more = new ArrayList<>(); for (PeerAddresses peer : res) { - if (! queried.contains(peer.peerId)) { + if (!queried.contains(peer.peerId)) { Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256); RoutingEntry e = new RoutingEntry(peerKey, peer); more.add(e); @@ -430,7 +508,8 @@ public CompletableFuture publishValue(Multihash publisher, if (publishes.size() >= minPublishes) return; toQuery.addAll(f.orTimeout(2, TimeUnit.SECONDS).join()); - } catch (Exception e) {} + } catch (Exception e) { + } }); // exit early if we have enough results if (publishes.size() >= minPublishes) @@ -440,8 +519,8 @@ public CompletableFuture publishValue(Multihash publisher, System.out.println("Publishing to further nodes, so far only " + publishes.size()); while (publishes.size() < minPublishes && !toQuery.isEmpty()) { List closest = toQuery.stream() - .limit(minPublishes - publishes.size() + 5) - .collect(Collectors.toList()); + .limit(minPublishes - publishes.size() + 5) + .collect(Collectors.toList()); List> lastFutures = closest.stream() .map(r -> { toQuery.remove(r); @@ -456,7 +535,8 @@ public CompletableFuture publishValue(Multihash publisher, lastFutures.forEach(f -> { try { f.orTimeout(2, TimeUnit.SECONDS).join(); - } catch (Exception e) {} + } catch (Exception e) { + } }); } break; @@ -475,7 +555,7 @@ public CompletableFuture resolveIpnsValue(Multihash publisher, Host us, private CompletableFuture> getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) { try { - return dialPeer(peer, us) + return dialPeer(peer, us).getController() .orTimeout(1, TimeUnit.SECONDS) .join() .getValue(publisher) @@ -485,6 +565,7 @@ private CompletableFuture> getValueFromPeer(PeerAddresses pe return CompletableFuture.completedFuture(Optional.empty()); } } + public List resolveValue(Multihash publisher, int minResults, Host us) { byte[] key = IPNS.getKey(publisher); List candidates = Collections.synchronizedList(new ArrayList<>()); @@ -501,7 +582,7 @@ public List resolveValue(Multihash publisher, int minResults, Host u .collect(Collectors.toList())); Set queried = Collections.synchronizedSet(new HashSet<>()); int countdown = 20; // TODO: Does this have to be 20 as in `replication` or unrelated? - while (! toQuery.isEmpty()) { + while (!toQuery.isEmpty()) { int remaining = toQuery.size() - 3; List thisRound = toQuery.stream() .limit(queryParallelism) @@ -530,7 +611,8 @@ public List resolveValue(Multihash publisher, int minResults, Host u return; f.orTimeout(5, TimeUnit.SECONDS).join() .orTimeout(5, TimeUnit.SECONDS).join(); - } catch (Exception e) {} + } catch (Exception e) { + } }); // exit early if we have enough results if (candidates.size() >= minResults) diff --git a/src/main/java/org/peergos/protocol/dht/KademliaEngine.java b/src/main/java/org/peergos/protocol/dht/KademliaEngine.java index 62fe6c2d..6eb164eb 100644 --- a/src/main/java/org/peergos/protocol/dht/KademliaEngine.java +++ b/src/main/java/org/peergos/protocol/dht/KademliaEngine.java @@ -198,6 +198,7 @@ public void receiveRequest(Dht.Message msg, PeerId source, Stream stream) { case PING: {break;} // Not used any more default: throw new IllegalStateException("Unknown message kademlia type: " + msg.getType()); } + stream.close(); } public static boolean isPublic(Multiaddr addr) { diff --git a/src/main/java/org/peergos/protocol/http/HttpProtocol.java b/src/main/java/org/peergos/protocol/http/HttpProtocol.java index 217c5f43..2d3fd031 100644 --- a/src/main/java/org/peergos/protocol/http/HttpProtocol.java +++ b/src/main/java/org/peergos/protocol/http/HttpProtocol.java @@ -1,8 +1,5 @@ package org.peergos.protocol.http; -import io.ipfs.cid.*; -import io.ipfs.multibase.*; -import io.ipfs.multihash.*; import io.libp2p.core.*; import io.libp2p.core.multistream.*; import io.libp2p.protocol.*; @@ -46,9 +43,11 @@ public Sender(Stream stream) { @Override public void onMessage(@NotNull Stream stream, FullHttpResponse msg) { CompletableFuture req = queue.poll(); - if (req != null) - req.complete(msg.copy()); - msg.release(); + if (req != null) { + req.complete(msg.retain()); + } else { + msg.release(); + } stream.close(); } @@ -86,7 +85,7 @@ public Receiver(HttpRequestProcessor requestHandler) { } private void sendReply(HttpContent reply, Stream p2pstream) { - p2pstream.writeAndFlush(reply.copy()); + p2pstream.writeAndFlush(reply.retain()); } @Override @@ -120,7 +119,14 @@ protected void initChannel(Channel ch) throws Exception { Channel ch = fut.channel(); FullHttpRequest retained = msg.retain(); - fut.addListener(x -> ch.writeAndFlush(retained)); + fut.addListener(x -> { + if (x.isSuccess()) + ch.writeAndFlush(retained).addListener(f -> { + retained.release(); + }); + else + retained.release(); + }); } private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; // This is the total inbound or outbound traffic allowed, not a rate diff --git a/src/main/java/org/peergos/util/Futures.java b/src/main/java/org/peergos/util/Futures.java index 83b55de0..1a264d29 100755 --- a/src/main/java/org/peergos/util/Futures.java +++ b/src/main/java/org/peergos/util/Futures.java @@ -19,4 +19,10 @@ private static T logAndThrow(Throwable t, Optional message) { t.printStackTrace(); throw new RuntimeException(t.getMessage(), t); } + + public static CompletableFuture errored(Throwable t) { + CompletableFuture err = new CompletableFuture<>(); + err.completeExceptionally(t); + return err; + } } diff --git a/src/test/java/org/peergos/APIServiceTest.java b/src/test/java/org/peergos/APIServiceTest.java index 9ecd3db8..8f1763ca 100644 --- a/src/test/java/org/peergos/APIServiceTest.java +++ b/src/test/java/org/peergos/APIServiceTest.java @@ -45,7 +45,7 @@ public void runAPIServiceWithFileStorageTest() { @Test public void bulkGetTest() { EmbeddedIpfs ipfs = new EmbeddedIpfs(null, new ProvidingBlockstore(new RamBlockstore()), null, - null, null, Optional.empty(), Collections.emptyList(), Optional.empty()); + null, null, Optional.empty(), Collections.emptyList(), Optional.empty(), Collections.emptyList()); Cid cid1 = ipfs.blockstore.put("Hello".getBytes(), Cid.Codec.Raw).join(); Cid cid2= ipfs.blockstore.put("world!".getBytes(), Cid.Codec.Raw).join(); List wants = new ArrayList<>(); @@ -57,7 +57,7 @@ public void bulkGetTest() { public static void runAPIServiceTest(Blockstore blocks) { EmbeddedIpfs ipfs = new EmbeddedIpfs(null, new ProvidingBlockstore(blocks), null, - null, null, Optional.empty(), Collections.emptyList(), Optional.empty()); + null, null, Optional.empty(), Collections.emptyList(), Optional.empty(), Collections.emptyList()); Cid cid = Cid.decode("zdpuAwfJrGYtiGFDcSV3rDpaUrqCtQZRxMjdC6Eq9PNqLqTGg"); Assert.assertFalse("cid found", ipfs.blockstore.has(cid).join()); String text = "Hello world!"; diff --git a/src/test/java/org/peergos/BitswapMirrorTest.java b/src/test/java/org/peergos/BitswapMirrorTest.java index c3a7edfb..2cb02869 100644 --- a/src/test/java/org/peergos/BitswapMirrorTest.java +++ b/src/test/java/org/peergos/BitswapMirrorTest.java @@ -23,10 +23,10 @@ public class BitswapMirrorTest { //@Ignore // Local testing only for now - run this prior: ./ipfs pin add zdpuAwfJrGYtiGFDcSV3rDpaUrqCtQZRxMjdC6Eq9PNqLqTGg public void mirrorTree() throws IOException { HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true),false, false); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); IPFS kubo = new IPFS("localhost", 5001); Multiaddr kuboAddress = Multiaddr.fromString("/ip4/127.0.0.1/tcp/4001/p2p/" + kubo.id().get("ID")); preloadBlocksToKubo(kubo); diff --git a/src/test/java/org/peergos/BitswapTest.java b/src/test/java/org/peergos/BitswapTest.java index 25e90b70..faf337a9 100644 --- a/src/test/java/org/peergos/BitswapTest.java +++ b/src/test/java/org/peergos/BitswapTest.java @@ -1,18 +1,27 @@ package org.peergos; -import io.ipfs.cid.*; +import io.ipfs.cid.Cid; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import io.libp2p.core.multiformats.*; -import org.junit.*; -import org.peergos.blockstore.*; -import org.peergos.protocol.bitswap.*; -import org.peergos.protocol.dht.*; +import io.libp2p.core.Host; +import io.libp2p.core.multiformats.Multiaddr; +import org.junit.Test; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.protocol.bitswap.Bitswap; +import org.peergos.protocol.dht.RamProviderStore; +import org.peergos.protocol.dht.RamRecordStore; -import java.nio.charset.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.*; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class BitswapTest { private static final Random rnd = new Random(28); @@ -20,11 +29,11 @@ public class BitswapTest { @Test public void getBlock() { HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); RamBlockstore blockstore2 = new RamBlockstore(); HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore2, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore2, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node2 = builder2.build(); node1.start().join(); node2.start().join(); @@ -38,7 +47,7 @@ public void getBlock() { .stream() .map(f -> f.join()) .collect(Collectors.toList()); - if (! Arrays.equals(receivedBlock.get(0).block, blockData)) + if (!Arrays.equals(receivedBlock.get(0).block, blockData)) throw new IllegalStateException("Incorrect block returned!"); } finally { node1.stop(); @@ -49,11 +58,11 @@ public void getBlock() { @Test public void getTenBlocks() { HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); RamBlockstore blockstore2 = new RamBlockstore(); HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore2, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore2, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node2 = builder2.build(); node1.start().join(); node2.start().join(); @@ -61,8 +70,8 @@ public void getTenBlocks() { Multiaddr address2 = node2.listenAddresses().get(0); List hashes = new ArrayList<>(); Random random = new Random(28); - for (int i=0; i < 10; i++) { - byte[] blockData = new byte[1024*1024]; + for (int i = 0; i < 10; i++) { + byte[] blockData = new byte[1024 * 1024]; random.nextBytes(blockData); Cid hash = blockstore2.put(blockData, Cid.Codec.Raw).join(); hashes.add(hash); @@ -85,7 +94,7 @@ public void getTenBlocks() { @Test public void blockFlooder() { HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host flooder = builder1.build(); RamBlockstore blockstore2 = new RamBlockstore(); HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(), @@ -112,7 +121,7 @@ public void blockFlooder() { if (Arrays.equals(receivedBlockAgain.get(0).block, blockData)) throw new IllegalStateException("Received block!"); } catch (CompletionException t) { - if (! (t.getCause() instanceof TimeoutException)) + if (!(t.getCause() instanceof TimeoutException)) throw t; } } finally { diff --git a/src/test/java/org/peergos/BootstrapTest.java b/src/test/java/org/peergos/BootstrapTest.java index 30fb8d15..5940ed97 100644 --- a/src/test/java/org/peergos/BootstrapTest.java +++ b/src/test/java/org/peergos/BootstrapTest.java @@ -1,17 +1,20 @@ package org.peergos; -import io.ipfs.multiaddr.*; +import io.ipfs.multiaddr.MultiAddress; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import org.junit.*; -import org.peergos.blockstore.*; -import org.peergos.protocol.*; -import org.peergos.protocol.dht.*; +import io.libp2p.core.Host; +import org.junit.Test; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.protocol.IdentifyBuilder; +import org.peergos.protocol.dht.Kademlia; +import org.peergos.protocol.dht.RamProviderStore; +import org.peergos.protocol.dht.RamRecordStore; -import java.util.*; -import java.util.concurrent.*; -import java.util.function.*; -import java.util.stream.*; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import java.util.stream.Collectors; public class BootstrapTest { @@ -37,10 +40,10 @@ public class BootstrapTest { @Test public void bootstrap() { HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); Multihash node1Id = Multihash.deserialize(node1.getPeerId().getBytes()); try { @@ -53,7 +56,7 @@ public void bootstrap() { // lookup ourselves in DHT to find our nearest nodes List closestPeers = dht.findClosestPeers(node1Id, 20, node1); - if (closestPeers.size() < connections/2) + if (closestPeers.size() < connections / 2) throw new IllegalStateException("Didn't find more close peers after bootstrap: " + closestPeers.size() + " < " + connections); } finally { diff --git a/src/test/java/org/peergos/EmbeddedIpfsTest.java b/src/test/java/org/peergos/EmbeddedIpfsTest.java index 3303aad7..d2baa7aa 100644 --- a/src/test/java/org/peergos/EmbeddedIpfsTest.java +++ b/src/test/java/org/peergos/EmbeddedIpfsTest.java @@ -1,24 +1,42 @@ package org.peergos; -import identify.pb.*; -import io.ipfs.cid.*; -import io.ipfs.multiaddr.*; +import com.sun.net.httpserver.HttpServer; +import identify.pb.IdentifyOuterClass; +import io.ipfs.cid.Cid; +import io.ipfs.multiaddr.MultiAddress; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import io.libp2p.core.crypto.*; -import io.libp2p.core.multiformats.*; -import io.libp2p.crypto.keys.*; -import io.libp2p.protocol.*; -import org.junit.*; -import org.peergos.blockstore.*; -import org.peergos.config.*; -import org.peergos.protocol.dht.*; -import org.peergos.protocol.ipns.*; - -import java.time.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.*; +import io.libp2p.core.PeerId; +import io.libp2p.core.crypto.PrivKey; +import io.libp2p.core.multiformats.Multiaddr; +import io.libp2p.crypto.keys.Ed25519Kt; +import io.libp2p.protocol.Identify; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import org.junit.Assert; +import org.junit.Test; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.config.IdentitySection; +import org.peergos.protocol.dht.RamRecordStore; +import org.peergos.protocol.http.HttpProtocol; +import org.peergos.protocol.ipns.IPNS; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class EmbeddedIpfsTest { @@ -35,7 +53,73 @@ public void largeBlock() throws Exception { Cid block = node2.blockstore.put(new byte[1024 * 1024], Cid.Codec.Raw).join(); PeerId peerId2 = node2.node.getPeerId(); List retrieved = ForkJoinPool.commonPool().submit( - () -> node1.getBlocks(List.of(new Want(block)), Set.of(peerId2), false)) + () -> node1.getBlocks(List.of(new Want(block)), Set.of(peerId2), false)) + .get(5, TimeUnit.SECONDS); + Assert.assertTrue(retrieved.size() == 1); + + node1.stop(); + node2.stop(); + } + + @Test + public void largeWrite() throws Exception { + System.setProperty("io.netty.leakDetection.level", "advanced"); + // Start proxy target + InetSocketAddress proxyTarget = new InetSocketAddress("localhost", 7777); + HttpServer target = HttpServer.create(proxyTarget, 20); + String reply = "AllGood"; + byte[] replyBytes = reply.getBytes(); + target.createContext("/", ex -> { + ex.sendResponseHeaders(200, replyBytes.length); + OutputStream out = ex.getResponseBody(); + out.write(replyBytes); + out.flush(); + out.close(); + }); + target.start(); + + HttpProtocol.HttpRequestProcessor http1 = (s, req, h) -> HttpProtocol.proxyRequest(req, proxyTarget, h); + EmbeddedIpfs node1 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http1)); + node1.start(); + + HttpProtocol.HttpRequestProcessor http2 = (s, req, h) -> HttpProtocol.proxyRequest(req, new InetSocketAddress("localhost", 7778), h); + EmbeddedIpfs node2 = build(node1.node.listenAddresses() + .stream() + .map(a -> new MultiAddress(a.toString())) + .collect(Collectors.toList()), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http2)); + node2.start(); + + for (int i = 0; i < 1000; i++) { + ByteBuf largeBody = Unpooled.buffer(2 * 1024 * 1024); + DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/hey", largeBody); + HttpProtocol.HttpController http = node2.p2pHttp.get().dial(node2.node, node1.node.getPeerId(), node1.node.listenAddresses().toArray(Multiaddr[]::new)) + .getController().join(); + FullHttpResponse resp = http.send(req).join(); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + int contentLength = resp.headers().getInt("content-length"); + resp.content().readBytes(bout, contentLength); + byte[] body = bout.toByteArray(); + Assert.assertTrue("Correct response", Arrays.equals(body, replyBytes)); + resp.release(); + resp.release(); + } + + node1.stop(); + node2.stop(); + } + + @Test + public void mdnsDiscovery() throws Exception { + EmbeddedIpfs node1 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort()))); + node1.start(); + EmbeddedIpfs node2 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort()))); + node2.start(); + + Thread.sleep(5_000); + Cid block = node2.blockstore.put(new byte[1024], Cid.Codec.Raw).join(); + PeerId peerId2 = node2.node.getPeerId(); + List retrieved = ForkJoinPool.commonPool().submit( + () -> node1.getBlocks(List.of(new Want(block)), Set.of(peerId2), false)) .get(5, TimeUnit.SECONDS); Assert.assertTrue(retrieved.size() == 1); @@ -65,7 +149,7 @@ public void publishPresignedValue() throws Exception { PrivKey publisher = Ed25519Kt.generateEd25519KeyPair().getFirst(); byte[] value = "This is a test".getBytes(); io.ipfs.multihash.Multihash pub = Multihash.deserialize(PeerId.fromPubKey(publisher.publicKey()).getBytes()); - long hoursTtl = 24*365; + long hoursTtl = 24 * 365; LocalDateTime expiry = LocalDateTime.now().plusHours(hoursTtl); long ttlNanos = hoursTtl * 3600_000_000_000L; byte[] signedRecord = IPNS.createSignedRecord(value, expiry, 1, ttlNanos, publisher); @@ -119,12 +203,16 @@ public void wildcardListenerAddressesGetExpanded() { } public static EmbeddedIpfs build(List bootstrap, List swarmAddresses) { + return build(bootstrap, swarmAddresses, Optional.empty()); + } + + public static EmbeddedIpfs build(List bootstrap, List swarmAddresses, Optional http) { BlockRequestAuthoriser blockRequestAuthoriser = (c, p, a) -> CompletableFuture.completedFuture(true); HostBuilder builder = new HostBuilder().generateIdentity(); PrivKey privKey = builder.getPrivateKey(); PeerId peerId = builder.getPeerId(); IdentitySection id = new IdentitySection(privKey.bytes(), peerId); return EmbeddedIpfs.build(new RamRecordStore(), new RamBlockstore(), true, swarmAddresses, bootstrap, - id, blockRequestAuthoriser, Optional.empty(), false); + id, blockRequestAuthoriser, http, false); } } diff --git a/src/test/java/org/peergos/FindPeerTest.java b/src/test/java/org/peergos/FindPeerTest.java index 313e780c..99fc202b 100644 --- a/src/test/java/org/peergos/FindPeerTest.java +++ b/src/test/java/org/peergos/FindPeerTest.java @@ -18,10 +18,10 @@ public class FindPeerTest { public void findLongRunningNode() { RamBlockstore blockstore1 = new RamBlockstore(); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); try { // bootstrap node 1 @@ -51,7 +51,7 @@ private static long findAndDialPeer(Multihash toFind, Kademlia dht1, Host node1) Multiaddr[] addrs = peer.getPublicAddresses().stream().map(a -> Multiaddr.fromString(a.toString())).toArray(Multiaddr[]::new); dht1.dial(node1, PeerId.fromBase58(peer.peerId.toBase58()), addrs) .getController().join().closerPeers(toFind.toBytes()).join(); - System.out.println("Peer lookup took " + (t2-t1) + "ms"); + System.out.println("Peer lookup took " + (t2 - t1) + "ms"); return t2 - t1; } } diff --git a/src/test/java/org/peergos/FindProviderTest.java b/src/test/java/org/peergos/FindProviderTest.java index 1c724f85..ce563c89 100644 --- a/src/test/java/org/peergos/FindProviderTest.java +++ b/src/test/java/org/peergos/FindProviderTest.java @@ -20,7 +20,7 @@ public class FindProviderTest { public void findBlockProvider() { RamBlockstore blockstore = new RamBlockstore(); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); diff --git a/src/test/java/org/peergos/HandlerTest.java b/src/test/java/org/peergos/HandlerTest.java index 3c0027e8..f9ea7894 100644 --- a/src/test/java/org/peergos/HandlerTest.java +++ b/src/test/java/org/peergos/HandlerTest.java @@ -9,17 +9,27 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import org.peergos.blockstore.*; +import org.peergos.blockstore.Blockstore; +import org.peergos.blockstore.ProvidingBlockstore; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.blockstore.TypeLimitedBlockstore; import org.peergos.client.NabuClient; import org.peergos.net.APIHandler; -import org.peergos.protocol.bitswap.*; +import org.peergos.protocol.bitswap.Bitswap; +import org.peergos.protocol.bitswap.BitswapEngine; import org.peergos.protocol.dht.Kademlia; import org.peergos.protocol.dht.RamProviderStore; import org.peergos.protocol.dht.RamRecordStore; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.*; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.function.Predicate; @@ -46,7 +56,7 @@ public void codecTest() { apiServer = HttpServer.create(localAPIAddress, 500); Blockstore blocks = new TypeLimitedBlockstore(new RamBlockstore(), Set.of(Cid.Codec.Raw)); EmbeddedIpfs ipfs = new EmbeddedIpfs(null, new ProvidingBlockstore(blocks), null, null, - new Bitswap(new BitswapEngine(null, null, Bitswap.MAX_MESSAGE_SIZE)), Optional.empty(), Collections.emptyList(), Optional.empty()); + new Bitswap(new BitswapEngine(null, null, Bitswap.MAX_MESSAGE_SIZE)), Optional.empty(), Collections.emptyList(), Optional.empty(), Collections.emptyList()); apiServer.createContext(APIHandler.API_URL, new APIHandler(ipfs)); apiServer.setExecutor(Executors.newFixedThreadPool(50)); apiServer.start(); @@ -78,12 +88,13 @@ public void codecTest() { } } } + @Test @Ignore public void findBlockProviderTest() throws IOException { RamBlockstore blockstore = new RamBlockstore(); HostBuilder builder1 = HostBuilder.create(10000 + new Random().nextInt(50000), - new RamProviderStore(1000), new RamRecordStore(), blockstore, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); HttpServer apiServer = null; @@ -100,7 +111,7 @@ public void findBlockProviderTest() throws IOException { apiServer = HttpServer.create(localAPIAddress, 500); EmbeddedIpfs ipfs = new EmbeddedIpfs(node1, new ProvidingBlockstore(new RamBlockstore()), null, dht, - null, Optional.empty(), Collections.emptyList(), Optional.empty()); + null, Optional.empty(), Collections.emptyList(), Optional.empty(), Collections.emptyList()); apiServer.createContext(APIHandler.API_URL, new APIHandler(ipfs)); apiServer.setExecutor(Executors.newFixedThreadPool(50)); apiServer.start(); @@ -117,6 +128,7 @@ public void findBlockProviderTest() throws IOException { } } } + @Test public void blockMethodsTest() { HttpServer apiServer = null; @@ -126,7 +138,7 @@ public void blockMethodsTest() { apiServer = HttpServer.create(localAPIAddress, 500); EmbeddedIpfs ipfs = new EmbeddedIpfs(null, new ProvidingBlockstore(new RamBlockstore()), null, - null, new Bitswap(new BitswapEngine(null, null, Bitswap.MAX_MESSAGE_SIZE)), Optional.empty(), Collections.emptyList(), Optional.empty()); + null, new Bitswap(new BitswapEngine(null, null, Bitswap.MAX_MESSAGE_SIZE)), Optional.empty(), Collections.emptyList(), Optional.empty(), Collections.emptyList()); apiServer.createContext(APIHandler.API_URL, new APIHandler(ipfs)); apiServer.setExecutor(Executors.newFixedThreadPool(50)); apiServer.start(); @@ -139,7 +151,7 @@ public void blockMethodsTest() { byte[] block = text.getBytes(); Cid addedHash = nabu.putBlock(block, Optional.of("raw")); - int size = nabu.stat(addedHash); + int size = nabu.stat(addedHash); Assert.assertTrue("size as expected", size == text.length()); boolean has = nabu.hasBlock(addedHash, Optional.empty()); diff --git a/src/test/java/org/peergos/HttpProxyTest.java b/src/test/java/org/peergos/HttpProxyTest.java index e5ed575f..bc22c381 100644 --- a/src/test/java/org/peergos/HttpProxyTest.java +++ b/src/test/java/org/peergos/HttpProxyTest.java @@ -1,25 +1,39 @@ package org.peergos; -import com.sun.net.httpserver.*; +import com.sun.net.httpserver.HttpServer; import io.ipfs.cid.Cid; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import io.libp2p.core.multiformats.*; -import io.netty.handler.codec.http.*; -import org.junit.*; -import org.peergos.blockstore.*; -import org.peergos.protocol.*; -import org.peergos.protocol.dht.*; -import org.peergos.protocol.http.*; -import org.peergos.util.*; -import org.peergos.util.HttpUtil; - -import java.io.*; -import java.net.*; -import java.nio.charset.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.zip.*; +import io.libp2p.core.Host; +import io.libp2p.core.PeerId; +import io.libp2p.core.multiformats.Multiaddr; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import org.junit.Assert; +import org.junit.Test; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.protocol.IdentifyBuilder; +import org.peergos.protocol.dht.Kademlia; +import org.peergos.protocol.dht.RamProviderStore; +import org.peergos.protocol.dht.RamRecordStore; +import org.peergos.protocol.http.HttpProtocol; +import org.peergos.util.JSONParser; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; public class HttpProxyTest { @@ -34,19 +48,19 @@ public void peerid() { public void p2pProxyRequest() throws IOException { InetSocketAddress unusedProxyTarget = new InetSocketAddress("127.0.0.1", 7000); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false) + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false) .addProtocol(new HttpProtocol.Binding(unusedProxyTarget)); Host node1 = builder1.build(); InetSocketAddress proxyTarget = new InetSocketAddress("127.0.0.1", TestPorts.getPort()); HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false) + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false) .addProtocol(new HttpProtocol.Binding(proxyTarget)); Host node2 = builder2.build(); node1.start().join(); node2.start().join(); // start local server with fixed HTTP response - byte[] httpReply = new byte[1024*1024]; + byte[] httpReply = new byte[1024 * 1024]; new Random(42).nextBytes(httpReply); HttpServer localhostServer = HttpServer.create(proxyTarget, 20); String headerName = "Random-Header"; @@ -79,7 +93,7 @@ public void p2pProxyRequest() throws IOException { ByteArrayOutputStream bout = new ByteArrayOutputStream(); resp.content().readBytes(bout, resp.headers().getInt("content-length")); - Assert.assertTrue(resp.headers().get(headerName).equals(headerValue)); + Assert.assertEquals(resp.headers().get(headerName), headerValue); resp.release(); byte[] replyBody = bout.toByteArray(); equal(replyBody, httpReply); @@ -95,11 +109,11 @@ public void p2pProxyRequest() throws IOException { public void p2pProxyClientTest() throws Exception { InetSocketAddress unusedProxyTarget = new InetSocketAddress("127.0.0.1", 7000); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false) + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false) .addProtocol(new HttpProtocol.Binding(unusedProxyTarget)); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); Kademlia dht = builder1.getWanDht().get(); dht.bootstrapRoutingTable(node1, BootstrapTest.BOOTSTRAP_NODES, a -> true); System.out.println("Bootstrapping node..."); @@ -136,7 +150,7 @@ public void p2pProxyClientTest() throws Exception { GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(bout.toByteArray())); - List reply = (List)JSONParser.parse(new String(readFully(gzip))); + List reply = (List) JSONParser.parse(new String(readFully(gzip))); Assert.assertTrue(reply.contains("peergos")); } System.out.println("Average: " + totalTime / count); @@ -146,10 +160,10 @@ public void p2pProxyClientTest() throws Exception { } public static byte[] readFully(InputStream in) throws IOException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - byte[] b = new byte[0x1000]; + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + byte[] b = new byte[0x1000]; int nRead; - while ((nRead = in.read(b, 0, b.length)) != -1 ) + while ((nRead = in.read(b, 0, b.length)) != -1) bout.write(b, 0, nRead); in.close(); return bout.toByteArray(); @@ -161,9 +175,9 @@ private static void equal(byte[] a, byte[] b) { for (int i = 0; i < a.length; i++) if (a[i] != b[i]) { byte[] diff = Arrays.copyOfRange(a, i, i + 24); - int j=0; - for (;j < b.length-2;j++) - if (b[j] == diff[0] && b[j+1] == diff[1]&& b[j+2] == diff[2]) + int j = 0; + for (; j < b.length - 2; j++) + if (b[j] == diff[0] && b[j + 1] == diff[1] && b[j + 2] == diff[2]) break; throw new IllegalStateException("bytes differ at " + i + " " + a[i] + " != " + b[i]); } diff --git a/src/test/java/org/peergos/IpnsTest.java b/src/test/java/org/peergos/IpnsTest.java index adbe96e5..e809bd3f 100644 --- a/src/test/java/org/peergos/IpnsTest.java +++ b/src/test/java/org/peergos/IpnsTest.java @@ -23,15 +23,15 @@ public class IpnsTest { public void publishIPNSRecordToKubo() throws IOException { RamBlockstore blockstore1 = new RamBlockstore(); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); Multihash node1Id = Multihash.deserialize(node1.getPeerId().getBytes()); try { IPFS kubo = new IPFS("localhost", 5001); - String kuboID = (String)kubo.id().get("ID"); + String kuboID = (String) kubo.id().get("ID"); Multiaddr address2 = Multiaddr.fromString("/ip4/127.0.0.1/tcp/4001/p2p/" + kuboID); Cid block = blockstore1.put("Provide me.".getBytes(), Cid.Codec.Raw).join(); Kademlia dht = builder1.getWanDht().get(); @@ -55,10 +55,10 @@ public void publishIPNSRecordToKubo() throws IOException { } catch (Exception timeout) { } } - if (! success) + if (!success) throw new IllegalStateException("Failed to publish IPNS record!"); GetResult getresult = dht.dial(node1, address2).getController().join().getValue(node1Id).join(); - if (! getresult.record.isPresent()) + if (!getresult.record.isPresent()) throw new IllegalStateException("Kubo didn't return our published IPNS record!"); } finally { node1.stop(); @@ -70,14 +70,14 @@ public void publishIPNSRecordToKubo() throws IOException { public void retrieveKuboPublishedIPNS() throws IOException { RamBlockstore blockstore1 = new RamBlockstore(); HostBuilder builder1 = HostBuilder.create(10000 + new Random().nextInt(50000), - new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); try { IPFS kubo = new IPFS("localhost", 5001); - String kuboIDString = (String)kubo.id().get("ID"); + String kuboIDString = (String) kubo.id().get("ID"); Multihash kuboId = Multihash.fromBase58(kuboIDString); Multiaddr address2 = Multiaddr.fromString("/ip4/127.0.0.1/tcp/4001/p2p/" + kuboIDString); @@ -88,7 +88,8 @@ public void retrieveKuboPublishedIPNS() throws IOException { GetResult kuboIpnsGet = wanDht.dial(node1, address2).getController().join().getValue(kuboId).join(); LinkedBlockingDeque queue = new LinkedBlockingDeque<>(); queue.addAll(kuboIpnsGet.closerPeers); - outer: for (int i=0; i < 100; i++) { + outer: + for (int i = 0; i < 100; i++) { if (kuboIpnsGet.record.isPresent()) break; PeerAddresses closer = queue.poll(); @@ -96,7 +97,7 @@ public void retrieveKuboPublishedIPNS() throws IOException { .map(a -> a.toString()) .filter(a -> a.contains("tcp") && a.contains("ip4") && !a.contains("127.0.0.1") && !a.contains("/172.")) .collect(Collectors.toList()); - for (String candidate: candidates) { + for (String candidate : candidates) { try { kuboIpnsGet = wanDht.dial(node1, Multiaddr.fromString(candidate + "/p2p/" + closer.peerId)).getController().join() .getValue(kuboId).join(); diff --git a/src/test/java/org/peergos/KademliaTest.java b/src/test/java/org/peergos/KademliaTest.java index df9bcbb8..90820955 100644 --- a/src/test/java/org/peergos/KademliaTest.java +++ b/src/test/java/org/peergos/KademliaTest.java @@ -1,18 +1,28 @@ package org.peergos; -import io.ipfs.cid.*; +import io.ipfs.cid.Cid; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import io.libp2p.core.crypto.*; -import io.libp2p.core.multiformats.*; -import io.libp2p.crypto.keys.*; -import org.junit.*; -import org.peergos.blockstore.*; -import org.peergos.protocol.*; -import org.peergos.protocol.dht.*; - -import java.util.*; -import java.util.concurrent.*; +import io.libp2p.core.Host; +import io.libp2p.core.PeerId; +import io.libp2p.core.crypto.PrivKey; +import io.libp2p.core.multiformats.Multiaddr; +import io.libp2p.crypto.keys.Ed25519Kt; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.protocol.IdentifyBuilder; +import org.peergos.protocol.dht.Kademlia; +import org.peergos.protocol.dht.KademliaEngine; +import org.peergos.protocol.dht.RamProviderStore; +import org.peergos.protocol.dht.RamRecordStore; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; public class KademliaTest { @@ -20,16 +30,16 @@ public class KademliaTest { public void findOtherNode() throws Exception { RamBlockstore blockstore1 = new RamBlockstore(); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node2 = builder2.build(); node2.start().join(); - IdentifyBuilder.addIdentifyProtocol(node2); + IdentifyBuilder.addIdentifyProtocol(node2, Collections.emptyList()); try { // bootstrap node 2 @@ -61,16 +71,16 @@ public void findOtherNode() throws Exception { public void ipnsBenchmark() throws Exception { RamBlockstore blockstore1 = new RamBlockstore(); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore1, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node2 = builder2.build(); node2.start().join(); - IdentifyBuilder.addIdentifyProtocol(node2); + IdentifyBuilder.addIdentifyProtocol(node2, Collections.emptyList()); Cid value = blockstore1.put("Publish me.".getBytes(), Cid.Codec.Raw).join(); @@ -96,8 +106,8 @@ public void ipnsBenchmark() throws Exception { long p0 = System.currentTimeMillis(); int publishes = dht1.publishIpnsValue(signer, pub, value, 1, node1).join(); long p1 = System.currentTimeMillis(); - System.out.println("Publish took " + printSeconds(p1-p0) + "s to " + publishes + " peers."); - publishTotal += p1-p0; + System.out.println("Publish took " + printSeconds(p1 - p0) + "s to " + publishes + " peers."); + publishTotal += p1 - p0; // retrieve it from node 2 long t0 = System.currentTimeMillis(); @@ -105,10 +115,10 @@ public void ipnsBenchmark() throws Exception { long t1 = System.currentTimeMillis(); Assert.assertTrue(res.equals("/ipfs/" + value)); System.out.println("Resolved in " + printSeconds(t1 - t0) + "s"); - resolveTotal += t1-t0; + resolveTotal += t1 - t0; } - System.out.println("Publish av: " + printSeconds(publishTotal/iterations) - + ", resolve av: " + printSeconds(resolveTotal/iterations)); + System.out.println("Publish av: " + printSeconds(publishTotal / iterations) + + ", resolve av: " + printSeconds(resolveTotal / iterations)); // retrieve all again for (PrivKey signer : signers) { @@ -126,7 +136,7 @@ public void ipnsBenchmark() throws Exception { } public static String printSeconds(long millis) { - return millis / 1000 + "." + (millis % 1000)/100; + return millis / 1000 + "." + (millis % 1000) / 100; } @Test @@ -136,9 +146,9 @@ public void kademliaFindNodeLimitTest() { new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore()); RamAddressBook addrs = new RamAddressBook(); kad.setAddressBook(addrs); - for (int i=0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { PeerId peer = new HostBuilder().generateIdentity().getPeerId(); - for (int j=0; j < 100; j++) { + for (int j = 0; j < 100; j++) { kad.addIncomingConnection(peer); addrs.addAddrs(peer, 0, new Multiaddr[]{new Multiaddr("/ip4/127.0.0.1/tcp/4001/p2p/" + peer.toBase58())}); } diff --git a/src/test/java/org/peergos/KuboFindProviderTest.java b/src/test/java/org/peergos/KuboFindProviderTest.java index bf811e5a..f9c9a9b2 100644 --- a/src/test/java/org/peergos/KuboFindProviderTest.java +++ b/src/test/java/org/peergos/KuboFindProviderTest.java @@ -20,11 +20,11 @@ public class KuboFindProviderTest { @Test public void findProviderOverYamux() throws IOException { HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), new RamProviderStore(1000), - new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false) + new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false) .addMuxers(List.of(StreamMuxerProtocol.getYamux())); Host node1 = builder1.build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); try { IPFS kubo = new IPFS("localhost", 5001); Multiaddr address2 = Multiaddr.fromString("/ip4/127.0.0.1/tcp/4001/p2p/" + kubo.id().get("ID")); diff --git a/src/test/java/org/peergos/KuboPingTest.java b/src/test/java/org/peergos/KuboPingTest.java index ff8b00ae..da976b4f 100644 --- a/src/test/java/org/peergos/KuboPingTest.java +++ b/src/test/java/org/peergos/KuboPingTest.java @@ -26,7 +26,7 @@ public void runPingOverYamux() throws IOException { .addMuxers(List.of(StreamMuxerProtocol.getYamux())) .build(); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); try { IPFS kubo = new IPFS("localhost", 5001); Multiaddr address2 = Multiaddr.fromString("/ip4/127.0.0.1/tcp/4001/p2p/" + kubo.id().get("ID")); diff --git a/src/test/java/org/peergos/KuboTest.java b/src/test/java/org/peergos/KuboTest.java index e1041662..36b88b86 100644 --- a/src/test/java/org/peergos/KuboTest.java +++ b/src/test/java/org/peergos/KuboTest.java @@ -21,7 +21,7 @@ public void getBlock() throws IOException { Bitswap bitswap1 = new Bitswap(new BitswapEngine(new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), Bitswap.MAX_MESSAGE_SIZE)); Host node1 = HostBuilder.build(TestPorts.getPort(), List.of(bitswap1)); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); try { IPFS kubo = new IPFS("localhost", 5001); Multiaddr address2 = Multiaddr.fromString("/ip4/127.0.0.1/tcp/4001/p2p/" + kubo.id().get("ID")); diff --git a/src/test/java/org/peergos/P2pHttpChatTest.java b/src/test/java/org/peergos/P2pHttpChatTest.java index bb4db223..5f3eb314 100644 --- a/src/test/java/org/peergos/P2pHttpChatTest.java +++ b/src/test/java/org/peergos/P2pHttpChatTest.java @@ -25,7 +25,7 @@ public void p2pHttpChat() { h.accept(replyOk.retain()); }); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false) + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false) .addProtocol(node1Http); Host node1 = builder1.build(); HttpProtocol.Binding node2Http = new HttpProtocol.Binding((s, req, h) -> { @@ -34,7 +34,7 @@ public void p2pHttpChat() { h.accept(replyOk.retain()); }); HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false) + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false) .addProtocol(node2Http); Host node2 = builder2.build(); node1.start().join(); @@ -75,6 +75,7 @@ public void p2pHttpChat() { node2.stop(); } } + public static void printBody(HttpRequest req) { if (req instanceof FullHttpRequest) { ByteBuf content = ((FullHttpRequest) req).content(); diff --git a/src/test/java/org/peergos/P2pHttpTest.java b/src/test/java/org/peergos/P2pHttpTest.java index 7282464d..586b0db3 100644 --- a/src/test/java/org/peergos/P2pHttpTest.java +++ b/src/test/java/org/peergos/P2pHttpTest.java @@ -37,7 +37,7 @@ public void p2pTest() { h.accept(emptyReply.retain()); }); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false, false); builder1 = builder1.addProtocol(node1Http); Host node1 = builder1.build(); node1.start().join(); @@ -73,7 +73,7 @@ public void p2pTest() { HttpProtocol.proxyRequest(req, new InetSocketAddress("127.0.0.1", localPort), h); }); HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore2, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore2, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); builder2 = builder2.addProtocol(node2Http); Host node2 = builder2.build(); node2.start().join(); diff --git a/src/test/java/org/peergos/PingTest.java b/src/test/java/org/peergos/PingTest.java index 637a39a4..d52056c3 100644 --- a/src/test/java/org/peergos/PingTest.java +++ b/src/test/java/org/peergos/PingTest.java @@ -1,21 +1,28 @@ package org.peergos; -import identify.pb.*; -import io.ipfs.multiaddr.*; +import identify.pb.IdentifyOuterClass; +import io.ipfs.multiaddr.MultiAddress; import io.ipfs.multihash.Multihash; -import io.libp2p.core.*; -import io.libp2p.core.crypto.*; -import io.libp2p.core.multiformats.*; -import io.libp2p.core.multistream.*; -import io.libp2p.crypto.keys.*; -import io.libp2p.protocol.*; -import org.junit.*; -import org.peergos.blockstore.*; -import org.peergos.protocol.*; -import org.peergos.protocol.bitswap.*; +import io.libp2p.core.Host; +import io.libp2p.core.crypto.PrivKey; +import io.libp2p.core.multiformats.Multiaddr; +import io.libp2p.core.multistream.ProtocolBinding; +import io.libp2p.crypto.keys.Ed25519Kt; +import io.libp2p.crypto.keys.RsaKt; +import io.libp2p.protocol.Identify; +import io.libp2p.protocol.IdentifyController; +import io.libp2p.protocol.Ping; +import io.libp2p.protocol.PingController; +import org.junit.Assert; +import org.junit.Test; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.protocol.IdentifyBuilder; +import org.peergos.protocol.bitswap.Bitswap; +import org.peergos.protocol.bitswap.BitswapEngine; -import java.util.*; -import java.util.concurrent.*; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; public class PingTest { @@ -62,8 +69,8 @@ public void runPingEd25519ToRSA() { new Bitswap(new BitswapEngine(new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), Bitswap.MAX_MESSAGE_SIZE)))); node1.start().join(); node2.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); - IdentifyBuilder.addIdentifyProtocol(node2); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); + IdentifyBuilder.addIdentifyProtocol(node2, Collections.emptyList()); Assert.assertTrue(new Multihash(Multihash.Type.id, node1Keys.publicKey().bytes()).toString().equals(node1.getPeerId().toString())); Assert.assertTrue(new Multihash(Multihash.Type.sha2_256, Hash.sha256(node2Keys.publicKey().bytes())).toString().equals(node2.getPeerId().toString())); @@ -87,9 +94,9 @@ public void replyIdentifyOnNewDial() { Host node1 = HostBuilder.build(TestPorts.getPort(), List.of(new Ping(), new Bitswap(new BitswapEngine(new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), Bitswap.MAX_MESSAGE_SIZE)))); Host node2 = HostBuilder.build(TestPorts.getPort(), List.of(new Ping(), new Bitswap(new BitswapEngine(new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), Bitswap.MAX_MESSAGE_SIZE)))); node1.start().join(); - IdentifyBuilder.addIdentifyProtocol(node1); + IdentifyBuilder.addIdentifyProtocol(node1, Collections.emptyList()); node2.start().join(); - IdentifyBuilder.addIdentifyProtocol(node2); + IdentifyBuilder.addIdentifyProtocol(node2, Collections.emptyList()); try { // ping from 1 to 2 Multiaddr address2 = node2.listenAddresses().get(0); diff --git a/src/test/java/org/peergos/ProvideTest.java b/src/test/java/org/peergos/ProvideTest.java index dff9ad9f..b13c338f 100644 --- a/src/test/java/org/peergos/ProvideTest.java +++ b/src/test/java/org/peergos/ProvideTest.java @@ -1,17 +1,20 @@ package org.peergos; -import io.ipfs.cid.*; -import io.ipfs.multiaddr.*; -import io.ipfs.multihash.*; -import io.libp2p.core.*; -import org.junit.*; -import org.peergos.blockstore.*; -import org.peergos.protocol.dht.*; +import io.ipfs.cid.Cid; +import io.ipfs.multiaddr.MultiAddress; +import io.ipfs.multihash.Multihash; +import io.libp2p.core.Host; +import org.junit.Ignore; +import org.junit.Test; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.protocol.dht.Kademlia; +import org.peergos.protocol.dht.RamProviderStore; +import org.peergos.protocol.dht.RamRecordStore; -import java.util.*; -import java.util.concurrent.*; -import java.util.function.*; -import java.util.stream.*; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import java.util.stream.Collectors; public class ProvideTest { @@ -20,7 +23,7 @@ public class ProvideTest { public void provideBlock() { RamBlockstore blockstore = new RamBlockstore(); HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(), - new RamProviderStore(1000), new RamRecordStore(), blockstore, (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), blockstore, (c, p, a) -> CompletableFuture.completedFuture(true), false, false); Host node1 = builder1.build(); node1.start().join(); Multihash node1Id = Multihash.deserialize(node1.getPeerId().getBytes()); diff --git a/src/test/java/org/peergos/RelayTest.java b/src/test/java/org/peergos/RelayTest.java index f66790d6..8e27328d 100644 --- a/src/test/java/org/peergos/RelayTest.java +++ b/src/test/java/org/peergos/RelayTest.java @@ -1,17 +1,24 @@ package org.peergos; -import io.ipfs.multiaddr.*; -import io.libp2p.core.*; -import io.libp2p.core.multiformats.*; -import org.junit.*; -import org.peergos.blockstore.*; -import org.peergos.protocol.circuit.*; -import org.peergos.protocol.dht.*; +import io.ipfs.multiaddr.MultiAddress; +import io.libp2p.core.Host; +import io.libp2p.core.PeerId; +import io.libp2p.core.multiformats.Multiaddr; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.peergos.blockstore.RamBlockstore; +import org.peergos.protocol.circuit.CircuitHopProtocol; +import org.peergos.protocol.circuit.Relay; +import org.peergos.protocol.dht.Kademlia; +import org.peergos.protocol.dht.RamProviderStore; +import org.peergos.protocol.dht.RamRecordStore; -import java.util.*; -import java.util.concurrent.*; -import java.util.function.*; -import java.util.stream.*; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import java.util.stream.Collectors; public class RelayTest { @@ -19,11 +26,11 @@ public class RelayTest { @Ignore // needs fixed find providers public void relay() { HostBuilder builder1 = HostBuilder.create(10000 + new Random().nextInt(50000), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true),false, false); Host node1 = builder1.build(); node1.start().join(); HostBuilder builder2 = HostBuilder.create(10000 + new Random().nextInt(50000), - new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true), false); + new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true),false, false); Host node2 = builder2.build(); node2.start().join(); diff --git a/src/test/java/org/peergos/blockstore/BloomTest.java b/src/test/java/org/peergos/blockstore/BloomTest.java index 6ef62c5f..5ffa771c 100644 --- a/src/test/java/org/peergos/blockstore/BloomTest.java +++ b/src/test/java/org/peergos/blockstore/BloomTest.java @@ -33,6 +33,10 @@ public void bloom() { long t4 = System.currentTimeMillis(); System.out.println("Doubling filter size took: " + (t4-t3)+ "ms"); + for (Cid ref : bs.refs(false).join()) { + Assert.assertTrue(filtered.has(ref).join()); + } + checkFalsePositiveRate(bloom, 14); }