diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 96c7d573..2b6a211b 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -8,10 +8,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- uses: ./.github/actions/local-s3
- name: Set up JDK 11
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 11
diff --git a/README.md b/README.md
index 9ef6349c..fadf89d1 100644
--- a/README.md
+++ b/README.md
@@ -29,6 +29,7 @@ Currently implemented properties:
* bloom/[infini filtered](https://www.rasmuspagh.net/papers/infinifilter.pdf) blockstore
* connect bitswap to kademlia for discovery, with a faster version with supplied peerids
* configurable cid publishing function
+* mDNS peer discovery
* Android compatibility
* example serverless chat app using p2p http proxy for Android
* interop tests with other implementations - https://github.com/libp2p/test-plans/
@@ -37,7 +38,6 @@ In the future we will add:
* circuit-relay
* dcutr (direct connection upgrade through relay)
* AutoRelay
-* mDNS peer discovery
* example iOS chat app
* QUIC transport (and encryption and multiplexing)
diff --git a/pom.xml b/pom.xml
index 4bcfd4b4..19584575 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.github.peergos
nabu
- v0.7.9
+ v0.8.0
UTF-8
@@ -16,7 +16,7 @@
2.2
v1.4.12
v1.4.4
- 1.9.10
+ 2.1.0
@@ -40,35 +40,31 @@
-
- org.jetbrains.kotlin
- kotlin-maven-plugin
- ${kotlin.version}
- true
-
org.apache.maven.plugins
- maven-dependency-plugin
- 2.6
+ maven-assembly-plugin
+ 3.1.1
+
+
+
+ jar-with-dependencies
+
+
+
- unpack-dependencies
+ make-assembly
package
- unpack-dependencies
+ single
-
- system
- META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA
- ${project.build.directory}/classes
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.10.1
+ org.jetbrains.kotlin
+ kotlin-maven-plugin
+ ${kotlin.version}
compile
@@ -76,60 +72,51 @@
compile
+
+
+
+
+
+
- testCompile
+ test-compile
test-compile
- testCompile
+ test-compile
+
+ ${maven.compiler.target}
+
org.apache.maven.plugins
- maven-assembly-plugin
- 3.1.1
-
-
-
- jar-with-dependencies
-
-
-
+ maven-compiler-plugin
+ 3.10.1
- make-assembly
- package
+ compile
+ compile
- single
+ compile
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.3.0
-
- package
+ testCompile
+ test-compile
- shade
+ testCompile
-
- false
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
+
+
+ default-compile
+ none
+
+
+ default-testCompile
+ none
@@ -139,7 +126,7 @@
org.jetbrains.kotlin
- kotlin-stdlib
+ kotlin-stdlib-jdk8
${kotlin.version}
@@ -162,7 +149,7 @@
com.github.peergos
jvm-libp2p
- 0.16.6
+ 0.18.0-ipv6-mdns-wildcard
org.slf4j
diff --git a/src/main/java/org/peergos/EmbeddedIpfs.java b/src/main/java/org/peergos/EmbeddedIpfs.java
index 1050b8b6..83bf987b 100644
--- a/src/main/java/org/peergos/EmbeddedIpfs.java
+++ b/src/main/java/org/peergos/EmbeddedIpfs.java
@@ -1,40 +1,64 @@
package org.peergos;
-import io.ipfs.cid.*;
-import io.ipfs.multiaddr.*;
+import io.ipfs.cid.Cid;
+import io.ipfs.multiaddr.MultiAddress;
import io.ipfs.multihash.Multihash;
-import io.libp2p.core.*;
-import io.libp2p.core.crypto.*;
-import io.libp2p.core.multiformats.*;
-import io.libp2p.core.multistream.*;
-import io.libp2p.protocol.*;
-import org.peergos.blockstore.*;
+import io.libp2p.core.AddressBook;
+import io.libp2p.core.Host;
+import io.libp2p.core.PeerId;
+import io.libp2p.core.crypto.PrivKey;
+import io.libp2p.core.crypto.PubKey;
+import io.libp2p.core.multiformats.Multiaddr;
+import io.libp2p.core.multistream.ProtocolBinding;
+import io.libp2p.discovery.MDnsDiscovery;
+import io.libp2p.protocol.Ping;
+import org.peergos.blockstore.Blockstore;
+import org.peergos.blockstore.FileBlockstore;
+import org.peergos.blockstore.FilteredBlockstore;
+import org.peergos.blockstore.ProvidingBlockstore;
+import org.peergos.blockstore.TypeLimitedBlockstore;
import org.peergos.blockstore.metadatadb.BlockMetadataStore;
-import org.peergos.blockstore.metadatadb.JdbcBlockMetadataStore;
import org.peergos.blockstore.metadatadb.CachingBlockMetadataStore;
+import org.peergos.blockstore.metadatadb.JdbcBlockMetadataStore;
import org.peergos.blockstore.metadatadb.sql.H2BlockMetadataCommands;
import org.peergos.blockstore.metadatadb.sql.UncloseableConnection;
import org.peergos.blockstore.s3.S3Blockstore;
-import org.peergos.config.*;
+import org.peergos.config.Config;
+import org.peergos.config.FilterType;
+import org.peergos.config.IdentitySection;
import org.peergos.net.ConnectionException;
-import org.peergos.protocol.*;
-import org.peergos.protocol.autonat.*;
-import org.peergos.protocol.bitswap.*;
-import org.peergos.protocol.circuit.*;
-import org.peergos.protocol.dht.*;
-import org.peergos.protocol.http.*;
-import org.peergos.protocol.ipns.*;
+import org.peergos.protocol.IdentifyBuilder;
+import org.peergos.protocol.autonat.AutonatProtocol;
+import org.peergos.protocol.bitswap.Bitswap;
+import org.peergos.protocol.bitswap.BitswapEngine;
+import org.peergos.protocol.circuit.CircuitHopProtocol;
+import org.peergos.protocol.circuit.CircuitStopProtocol;
+import org.peergos.protocol.dht.Kademlia;
+import org.peergos.protocol.dht.KademliaController;
+import org.peergos.protocol.dht.KademliaEngine;
+import org.peergos.protocol.dht.ProviderStore;
+import org.peergos.protocol.dht.RamProviderStore;
+import org.peergos.protocol.dht.RecordStore;
+import org.peergos.protocol.http.HttpProtocol;
+import org.peergos.protocol.ipns.IPNS;
+import org.peergos.protocol.ipns.IpnsRecord;
import org.peergos.util.Logging;
-import java.nio.file.*;
+import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
-import java.time.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.logging.*;
-import java.util.stream.*;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
public class EmbeddedIpfs {
private static final Logger LOG = Logging.LOG();
@@ -49,6 +73,8 @@ public class EmbeddedIpfs {
public final Optional p2pHttp;
private final List bootstrap;
private final Optional blockProvider;
+ private final List announce;
+ private final List mdns = new ArrayList<>();
public EmbeddedIpfs(Host node,
Blockstore blockstore,
@@ -57,7 +83,8 @@ public EmbeddedIpfs(Host node,
Bitswap bitswap,
Optional p2pHttp,
List bootstrap,
- Optional> newBlockProvider) {
+ Optional> newBlockProvider,
+ List announce) {
this.node = node;
this.blockstore = blockstore;
this.records = records;
@@ -68,6 +95,7 @@ public EmbeddedIpfs(Host node,
this.blocks = new BitswapBlockService(node, bitswap, dht);
this.blockProvider = newBlockProvider.map(q -> new PeriodicBlockProvider(22 * 3600_000L,
() -> blockstore.refs(false).join().stream(), node, dht, q));
+ this.announce = announce;
}
public int maxBlockSize() {
@@ -138,12 +166,15 @@ public void start() {
try {
this.stop().join();
} catch (Exception ex) {
- ex.printStackTrace();
+ LOG.info(ex.getMessage());
}
});
Runtime.getRuntime().addShutdownHook(shutdownHook);
node.start().join();
- IdentifyBuilder.addIdentifyProtocol(node);
+ IdentifyBuilder.addIdentifyProtocol(node, announce.stream()
+ .map(MultiAddress::toString)
+ .map(Multiaddr::new)
+ .collect(Collectors.toList()));
LOG.info("Node started and listening on " + node.listenAddresses());
LOG.info("Bootstrapping IPFS routing table");
if (bootstrap.isEmpty())
@@ -153,15 +184,34 @@ public void start() {
dht.bootstrap(node);
dht.startBootstrapThread(node);
- blockProvider.ifPresent(p -> p.start());
+ LOG.info("MDNS discovery enabled");
+ MDnsDiscovery mdns = new MDnsDiscovery(node, "_ipfs-discovery._udp.local.", 60, null);
+ this.mdns.add(mdns);
+ mdns.addHandler(peerInfo -> {
+ PeerId remote = PeerId.fromBase58(peerInfo.getPeerId().toBase58().substring(1)); // Not sure what's wrong with peerInfo, but this works
+ if (!remote.equals(node.getPeerId())) {
+ LOG.info(node.getPeerId() + " found local peer: " + remote.toBase58() + ", addrs: " + peerInfo.getAddresses());
+ Multiaddr[] remoteAddrs = peerInfo.getAddresses().toArray(new Multiaddr[0]);
+ KademliaController ctr = dht.dial(node, remote, remoteAddrs).getController().join();
+ ctr.closerPeers(node.getPeerId().getBytes()).join();
+ node.getAddressBook().addAddrs(remote, 0, remoteAddrs);
+ }
+ return null;
+ });
+ mdns.start();
+
+ blockProvider.ifPresent(PeriodicBlockProvider::start);
}
public CompletableFuture stop() throws Exception {
if (records != null) {
records.close();
}
- blockProvider.ifPresent(b -> b.stop());
+ blockProvider.ifPresent(PeriodicBlockProvider::stop);
dht.stopBootstrapThread();
+ for (MDnsDiscovery m : mdns) {
+ m.stop();
+ }
return node != null ? node.stop() : CompletableFuture.completedFuture(null);
}
@@ -179,6 +229,7 @@ public static BlockMetadataStore buildBlockMetadata(Args a) {
throw new RuntimeException(e);
}
}
+
public static Blockstore buildBlockStore(Config config, Path ipfsPath, BlockMetadataStore meta, boolean updateMetadb) {
Blockstore withMetadb;
if (config.datastore.blockMount.prefix.equals("flatfs.datastore")) {
@@ -206,9 +257,9 @@ public static Blockstore typeLimited(Blockstore blocks, Config config) {
public static Blockstore filteredBlockStore(Blockstore blocks, Config config) {
if (config.datastore.filter.type == FilterType.BLOOM) {
return FilteredBlockstore.bloomBased(blocks, config.datastore.filter.falsePositiveRate);
- } else if(config.datastore.filter.type == FilterType.INFINI) {
+ } else if (config.datastore.filter.type == FilterType.INFINI) {
return FilteredBlockstore.infiniBased(blocks, config.datastore.filter.falsePositiveRate);
- } else if(config.datastore.filter.type == FilterType.NONE) {
+ } else if (config.datastore.filter.type == FilterType.NONE) {
return blocks;
} else {
throw new IllegalStateException("Unhandled filter type: " + config.datastore.filter.type);
@@ -224,8 +275,8 @@ public static EmbeddedIpfs build(RecordStore records,
BlockRequestAuthoriser authoriser,
Optional handler,
boolean localEnabled) {
- return build(records, blocks, provideBlocks, swarmAddresses, bootstrap, identity, authoriser, handler, localEnabled,
- Optional.empty(), Optional.empty());
+ return build(records, blocks, provideBlocks, swarmAddresses, bootstrap, identity, Collections.emptyList(),
+ authoriser, handler, localEnabled, Optional.empty(), Optional.empty());
}
public static EmbeddedIpfs build(RecordStore records,
@@ -234,6 +285,7 @@ public static EmbeddedIpfs build(RecordStore records,
List swarmAddresses,
List bootstrap,
IdentitySection identity,
+ List announce,
BlockRequestAuthoriser authoriser,
Optional handler,
boolean localEnabled,
@@ -245,7 +297,7 @@ public static EmbeddedIpfs build(RecordStore records,
ProviderStore providers = new RamProviderStore(10_000);
HostBuilder builder = new HostBuilder().setIdentity(identity.privKeyProtobuf).listen(swarmAddresses);
- if (! builder.getPeerId().equals(identity.peerId)) {
+ if (!builder.getPeerId().equals(identity.peerId)) {
throw new IllegalStateException("PeerId invalid");
}
Multihash ourPeerId = Multihash.deserialize(builder.getPeerId().getBytes());
@@ -268,9 +320,9 @@ public static EmbeddedIpfs build(RecordStore records,
Host node = builder.addProtocols(protocols).build();
Optional> newBlockProvider = provideBlocks ?
- Optional.of(((ProvidingBlockstore)blockstore).toPublish) :
+ Optional.of(((ProvidingBlockstore) blockstore).toPublish) :
Optional.empty();
- return new EmbeddedIpfs(node, blockstore, records, dht, bitswap, httpHandler, bootstrap, newBlockProvider);
+ return new EmbeddedIpfs(node, blockstore, records, dht, bitswap, httpHandler, bootstrap, newBlockProvider, announce);
}
public static Multiaddr[] getAddresses(Host node, Kademlia dht, Multihash targetNodeId) throws ConnectionException {
diff --git a/src/main/java/org/peergos/HostBuilder.java b/src/main/java/org/peergos/HostBuilder.java
index 58621917..7c19121c 100644
--- a/src/main/java/org/peergos/HostBuilder.java
+++ b/src/main/java/org/peergos/HostBuilder.java
@@ -1,25 +1,40 @@
package org.peergos;
-import io.ipfs.multiaddr.*;
+import io.ipfs.multiaddr.MultiAddress;
import io.ipfs.multihash.Multihash;
-import io.libp2p.core.*;
-import io.libp2p.core.crypto.*;
-import io.libp2p.core.dsl.*;
-import io.libp2p.core.multiformats.*;
-import io.libp2p.core.multistream.*;
-import io.libp2p.core.mux.*;
-import io.libp2p.crypto.keys.*;
-import io.libp2p.protocol.*;
-import io.libp2p.security.noise.*;
-import io.libp2p.transport.tcp.*;
+import io.libp2p.core.ConnectionHandler;
+import io.libp2p.core.Host;
+import io.libp2p.core.PeerId;
+import io.libp2p.core.StreamPromise;
import io.libp2p.core.crypto.KeyKt;
-import org.peergos.blockstore.*;
-import org.peergos.protocol.autonat.*;
-import org.peergos.protocol.bitswap.*;
-import org.peergos.protocol.circuit.*;
-import org.peergos.protocol.dht.*;
-import java.util.*;
-import java.util.stream.*;
+import io.libp2p.core.crypto.PrivKey;
+import io.libp2p.core.dsl.Builder;
+import io.libp2p.core.dsl.BuilderJKt;
+import io.libp2p.core.multiformats.Multiaddr;
+import io.libp2p.core.multistream.ProtocolBinding;
+import io.libp2p.core.mux.StreamMuxerProtocol;
+import io.libp2p.crypto.keys.Ed25519Kt;
+import io.libp2p.protocol.IdentifyBinding;
+import io.libp2p.protocol.IdentifyController;
+import io.libp2p.protocol.IdentifyProtocol;
+import io.libp2p.protocol.Ping;
+import io.libp2p.security.noise.NoiseXXSecureChannel;
+import io.libp2p.transport.tcp.TcpTransport;
+import org.peergos.blockstore.Blockstore;
+import org.peergos.protocol.autonat.AutonatProtocol;
+import org.peergos.protocol.bitswap.Bitswap;
+import org.peergos.protocol.bitswap.BitswapEngine;
+import org.peergos.protocol.circuit.CircuitHopProtocol;
+import org.peergos.protocol.circuit.CircuitStopProtocol;
+import org.peergos.protocol.dht.Kademlia;
+import org.peergos.protocol.dht.KademliaEngine;
+import org.peergos.protocol.dht.ProviderStore;
+import org.peergos.protocol.dht.RecordStore;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
public class HostBuilder {
private PrivKey privKey;
@@ -46,21 +61,21 @@ public List getProtocols() {
public Optional getWanDht() {
return protocols.stream()
.filter(p -> p instanceof Kademlia && p.getProtocolDescriptor().getAnnounceProtocols().contains("/ipfs/kad/1.0.0"))
- .map(p -> (Kademlia)p)
+ .map(p -> (Kademlia) p)
.findFirst();
}
public Optional getBitswap() {
return protocols.stream()
.filter(p -> p instanceof Bitswap)
- .map(p -> (Bitswap)p)
+ .map(p -> (Bitswap) p)
.findFirst();
}
public Optional getRelayHop() {
return protocols.stream()
.filter(p -> p instanceof CircuitHopProtocol.Binding)
- .map(p -> (CircuitHopProtocol.Binding)p)
+ .map(p -> (CircuitHopProtocol.Binding) p)
.findFirst();
}
@@ -93,7 +108,6 @@ public HostBuilder setIdentity(byte[] privKey) {
}
-
public HostBuilder setPrivKey(PrivKey privKey) {
this.privKey = privKey;
this.peerId = PeerId.fromPubKey(privKey.publicKey());
@@ -182,7 +196,7 @@ public static Host build(PrivKey privKey,
if (connection.isInitiator())
return;
addrs.getAddrs(remotePeer).thenAccept(existing -> {
- if (! existing.isEmpty())
+ if (!existing.isEmpty())
return;
StreamPromise stream = connection.muxerSession()
.createStream(new IdentifyBinding(new IdentifyProtocol()));
@@ -217,7 +231,7 @@ public static Host build(PrivKey privKey,
});
for (ProtocolBinding protocol : protocols) {
if (protocol instanceof HostConsumer)
- ((HostConsumer)protocol).setHost(host);
+ ((HostConsumer) protocol).setHost(host);
}
return host;
}
diff --git a/src/main/java/org/peergos/IpnsPublisher.java b/src/main/java/org/peergos/IpnsPublisher.java
index 77f5b3d7..0f9aeacd 100644
--- a/src/main/java/org/peergos/IpnsPublisher.java
+++ b/src/main/java/org/peergos/IpnsPublisher.java
@@ -1,27 +1,39 @@
package org.peergos;
-import io.ipfs.multiaddr.*;
-import io.ipfs.multihash.*;
-import io.libp2p.core.*;
-import io.libp2p.core.crypto.*;
-import io.libp2p.crypto.keys.*;
-import org.peergos.blockstore.*;
-import org.peergos.config.*;
-import org.peergos.protocol.dht.*;
-import org.peergos.protocol.ipns.*;
-import org.peergos.util.*;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import io.libp2p.core.PeerId;
+import io.libp2p.core.crypto.KeyKt;
+import io.libp2p.core.crypto.PrivKey;
+import io.libp2p.crypto.keys.Ed25519Kt;
+import org.peergos.blockstore.RamBlockstore;
+import org.peergos.config.Config;
+import org.peergos.config.IdentitySection;
+import org.peergos.protocol.dht.RamRecordStore;
+import org.peergos.protocol.ipns.IPNS;
+import org.peergos.protocol.ipns.IpnsRecord;
+import org.peergos.util.ArrayOps;
-import java.io.*;
-import java.nio.file.*;
-import java.time.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.stream.*;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
public class IpnsPublisher {
private static final ExecutorService ioExec = Executors.newFixedThreadPool(50);
+
public static void main(String[] a) throws Exception {
Path publishFile = Paths.get("publishers.txt");
EmbeddedIpfs publisher = startIpfs();
@@ -36,7 +48,7 @@ public static void main(String[] a) throws Exception {
.collect(Collectors.toList());
System.out.println("Resolving " + records.size() + " keys");
- for (int c=0; c < 100; c++) {
+ for (int c = 0; c < 100; c++) {
long t0 = System.currentTimeMillis();
List recordCounts = resolveAndRepublish(records, resolver, publisher);
Path resultsFile = Paths.get("publish-resolve-counts-" + LocalDateTime.now().withNano(0) + ".txt");
@@ -75,7 +87,7 @@ public static void main(String[] a) throws Exception {
System.out.println("published " + done + " / " + keycount);
}
long t1 = System.currentTimeMillis();
- System.out.println("Published all in " + (t1-t0)/1000 + "s");
+ System.out.println("Published all in " + (t1 - t0) / 1000 + "s");
publisher.stop().join();
}
System.exit(0);
diff --git a/src/main/java/org/peergos/PeerAddresses.java b/src/main/java/org/peergos/PeerAddresses.java
index f2afb379..c2efe8c8 100644
--- a/src/main/java/org/peergos/PeerAddresses.java
+++ b/src/main/java/org/peergos/PeerAddresses.java
@@ -15,6 +15,7 @@
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
+import java.util.stream.Stream;
public class PeerAddresses {
public final Multihash peerId;
@@ -57,7 +58,13 @@ public static PeerAddresses fromProtobuf(Dht.Message.Peer peer) {
Multihash peerId = Multihash.deserialize(peer.getId().toByteArray());
List addrs = peer.getAddrsList()
.stream()
- .map(b -> Multiaddr.deserialize(b.toByteArray()))
+ .flatMap(b -> {
+ try {
+ return Stream.of(Multiaddr.deserialize(b.toByteArray()));
+ } catch (Exception e) {
+ return Stream.empty();
+ }
+ })
.collect(Collectors.toList());
return new PeerAddresses(peerId, addrs);
}
diff --git a/src/main/java/org/peergos/RamAddressBook.java b/src/main/java/org/peergos/RamAddressBook.java
index b871d27b..8de76b9f 100644
--- a/src/main/java/org/peergos/RamAddressBook.java
+++ b/src/main/java/org/peergos/RamAddressBook.java
@@ -1,19 +1,28 @@
package org.peergos;
-import io.libp2p.core.*;
-import io.libp2p.core.multiformats.*;
-import org.jetbrains.annotations.*;
-import org.peergos.util.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.*;
+import io.libp2p.core.AddressBook;
+import io.libp2p.core.PeerId;
+import io.libp2p.core.multiformats.Multiaddr;
+import io.libp2p.core.multiformats.MultiaddrComponent;
+import io.libp2p.core.multiformats.Protocol;
+import org.jetbrains.annotations.NotNull;
+import org.peergos.util.LRUCache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
public class RamAddressBook implements AddressBook {
- // TODO: Does this need to be public?
- public Map> addresses;
+ private final Map> addresses;
public RamAddressBook() {
addresses = Collections.synchronizedMap(new LRUCache<>(10_000));
diff --git a/src/main/java/org/peergos/blockstore/CidInfiniFilter.java b/src/main/java/org/peergos/blockstore/CidInfiniFilter.java
index 0d6327be..4ac10e83 100644
--- a/src/main/java/org/peergos/blockstore/CidInfiniFilter.java
+++ b/src/main/java/org/peergos/blockstore/CidInfiniFilter.java
@@ -35,13 +35,18 @@ public static CidInfiniFilter build(Blockstore bs) {
public static CidInfiniFilter build(Blockstore bs, double falsePositiveRate) {
List refs = bs.refs(false).join();
int nBlocks = refs.size()*5/4; // increase by 25% to avoid expansion during build
+ CidInfiniFilter infini = build(nBlocks, falsePositiveRate);
+ refs.forEach(c -> infini.add(c));
+ return infini;
+ }
+
+ public static CidInfiniFilter build(int nBlocks, double falsePositiveRate) {
int nextPowerOfTwo = Math.max(17, (int) (1 + Math.log(nBlocks) / Math.log(2)));
double expansionAlpha = 0.8;
int bitsPerEntry = (int)(4 - Math.log(falsePositiveRate / expansionAlpha) / Math.log(2) + 1);
LOG.info("Using infini filter of initial size " + ((double)(bitsPerEntry * (1 << nextPowerOfTwo) / 8) / 1024 / 1024) + " MiB");
ChainedInfiniFilter infini = new ChainedInfiniFilter(nextPowerOfTwo, bitsPerEntry);
infini.set_expand_autonomously(true);
- refs.forEach(c -> infini.insert(c.toBytes(), true));
return new CidInfiniFilter(infini);
}
}
diff --git a/src/main/java/org/peergos/blockstore/FilteredBlockstore.java b/src/main/java/org/peergos/blockstore/FilteredBlockstore.java
index ab58a2bd..e2ef0adc 100644
--- a/src/main/java/org/peergos/blockstore/FilteredBlockstore.java
+++ b/src/main/java/org/peergos/blockstore/FilteredBlockstore.java
@@ -8,27 +8,48 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.*;
public class FilteredBlockstore implements Blockstore {
private final Blockstore blocks;
- private final Filter filter;
+ private final Filter present;
+ private volatile Filter absent;
+ private final AtomicLong absentCount = new AtomicLong(0);
- public FilteredBlockstore(Blockstore blocks, Filter filter) {
+ public FilteredBlockstore(Blockstore blocks, Filter present) {
this.blocks = blocks;
- this.filter = filter;
+ this.present = present;
+ this.absent = buildAbsentFilter();
}
public CompletableFuture bloomAdd(Cid cid) {
- filter.add(cid);
+ present.add(cid);
return CompletableFuture.completedFuture(true);
}
+ private static Filter buildAbsentFilter() {
+ return CidInfiniFilter.build(1_000, 0.001);
+ }
+
+ private void addAbsentBlock(Cid c) {
+ if (absentCount.get() > 10_000) {
+ absentCount.set(0);
+ absent = buildAbsentFilter();
+ }
+ absentCount.incrementAndGet();
+ absent.add(c);
+ }
+
@Override
public CompletableFuture has(Cid c) {
- if (filter.has(c))
- return blocks.has(c);
+ if (present.has(c) && ! absent.has(c))
+ return blocks.has(c).thenApply(res -> {
+ if (! res)
+ addAbsentBlock(c);
+ return res;
+ });
return CompletableFuture.completedFuture(false);
}
@@ -40,15 +61,19 @@ public CompletableFuture hasAny(Multihash h) {
@Override
public CompletableFuture> get(Cid c) {
- if (filter.has(c))
- return blocks.get(c);
+ if (present.has(c) && ! absent.has(c)) {
+ return blocks.get(c).exceptionally(t -> {
+ addAbsentBlock(c);
+ return Optional.empty();
+ });
+ }
return CompletableFuture.completedFuture(Optional.empty());
}
@Override
public CompletableFuture put(byte[] block, Cid.Codec codec) {
return blocks.put(block, codec)
- .thenApply(filter::add);
+ .thenApply(present::add);
}
@Override
diff --git a/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java b/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java
index aff3b92d..e80a746a 100644
--- a/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java
+++ b/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java
@@ -10,16 +10,27 @@
import org.peergos.blockstore.metadatadb.BlockMetadata;
import org.peergos.blockstore.metadatadb.BlockMetadataStore;
import org.peergos.cbor.CborObject;
+import org.peergos.util.ArrayOps;
+import org.peergos.util.Futures;
import org.peergos.util.Hasher;
-import org.peergos.util.*;
+import org.peergos.util.HttpUtil;
+import org.peergos.util.Logging;
+import org.peergos.util.Pair;
import javax.net.ssl.SSLException;
-import java.io.*;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZonedDateTime;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
@@ -28,7 +39,9 @@
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.stream.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
public class S3Blockstore implements Blockstore {
@@ -130,13 +143,16 @@ public S3Blockstore(Map params, BlockMetadataStore blockMetadata
hasher = new Hasher();
LOG.info("Using S3BlockStore");
}
+
private String getHost() {
return bucket + "." + regionEndpoint;
}
+
private boolean useHttps() {
String host = getHost();
- return ! host.endsWith("localhost") && ! host.contains("localhost:");
+ return !host.endsWith("localhost") && !host.contains("localhost:");
}
+
private Map readKeysFromAwsConfig() {
String filePath = System.getenv("HOME") + "/" + ".aws/credentials";
LOG.info("Reading [default] config from: " + filePath);
@@ -146,19 +162,19 @@ private Map readKeysFromAwsConfig() {
boolean foundDefaultSection = false;
boolean foundAccessKey = false;
boolean foundSecretKey = false;
- for(String line : lines) {
+ for (String line : lines) {
String trimmedLine = line.trim();
- if(!foundDefaultSection) {
- if (trimmedLine.equals("[default]")){
+ if (!foundDefaultSection) {
+ if (trimmedLine.equals("[default]")) {
foundDefaultSection = true;
}
} else {
if (trimmedLine.startsWith("[")) {
throw new IllegalStateException("Unable to read expected fields in: " + filePath);
- } else if(trimmedLine.startsWith("aws_access_key_id")) {
+ } else if (trimmedLine.startsWith("aws_access_key_id")) {
params.put("aws_access_key_id", extractParamValue(trimmedLine));
foundAccessKey = true;
- } else if(trimmedLine.startsWith("aws_secret_access_key")) {
+ } else if (trimmedLine.startsWith("aws_secret_access_key")) {
params.put("aws_secret_access_key", extractParamValue(trimmedLine));
foundSecretKey = true;
}
@@ -175,10 +191,11 @@ private Map readKeysFromAwsConfig() {
throw new IllegalStateException("Unable to find aws_secret_access_key");
}
} catch (IOException ioe) {
- throw new IllegalStateException("Unable to read: " + filePath, ioe);
+ throw new IllegalStateException("Unable to read: " + filePath, ioe);
}
return params;
}
+
private String extractParamValue(String line) {
int equalsIndex = line.indexOf("=");
if (equalsIndex == -1) {
@@ -186,16 +203,18 @@ private String extractParamValue(String line) {
}
return line.substring(equalsIndex + 1).trim();
}
+
private String getParam(Map params, String key) {
if (params.containsKey(key)) {
- return ((String )params.get(key)).trim();
+ return ((String) params.get(key)).trim();
} else {
throw new IllegalStateException("Expecting param: " + key);
}
}
+
private String getParam(Map params, String key, String defaultValue) {
if (params.containsKey(key)) {
- return ((String )params.get(key)).trim();
+ return ((String) params.get(key)).trim();
} else {
return defaultValue;
}
@@ -233,7 +252,7 @@ public void updateMetadataStoreIfEmpty() {
ForkJoinPool pool = new ForkJoinPool(updateParallelism);
int batchSize = all.size() / updateParallelism;
AtomicLong progress = new AtomicLong(0);
- int tenth = batchSize/10;
+ int tenth = batchSize / 10;
List>> futures = IntStream.range(0, updateParallelism)
.mapToObj(b -> pool.submit(() -> IntStream.range(b * batchSize, (b + 1) * batchSize)
@@ -256,13 +275,14 @@ public void updateMetadataStoreIfEmpty() {
private static V getWithBackoff(Supplier req) {
long sleep = 100;
- for (int i=0; i < 20; i++) {
+ for (int i = 0; i < 20; i++) {
try {
return req.get();
} catch (RateLimitException e) {
try {
Thread.sleep(sleep);
- } catch (InterruptedException f) {}
+ } catch (InterruptedException f) {
+ }
sleep *= 2;
}
}
@@ -294,7 +314,7 @@ private CompletableFuture> getSizeWithoutRetry(Cid cid) {
Map> headRes = HttpUtil.head(headUrl.base, headUrl.fields);
blockHeads.inc();
long size = Long.parseLong(headRes.get("Content-Length").get(0));
- return Futures.of(Optional.of((int)size));
+ return Futures.of(Optional.of((int) size));
} catch (FileNotFoundException f) {
LOG.warning("S3 404 error reading " + cid);
return Futures.of(Optional.empty());
@@ -306,7 +326,7 @@ private CompletableFuture> getSizeWithoutRetry(Cid cid) {
throw new RateLimitException();
}
boolean notFound = msg.startsWith("NoSuchKey
");
- if (! notFound) {
+ if (!notFound) {
LOG.warning("S3 error reading " + cid);
LOG.log(Level.WARNING, msg, e);
}
@@ -344,12 +364,12 @@ private CompletableFuture> getWithoutRetry(Cid cid) {
throw new RateLimitException();
}
boolean notFound = msg.startsWith("NoSuchKey
");
- if (! notFound) {
+ if (!notFound) {
LOG.warning("S3 error reading " + path);
LOG.log(Level.WARNING, msg, e);
}
failedBlockGets.inc();
- throw new RuntimeException(e.getMessage(), e);
+ return Futures.errored(e);
} finally {
readTimer.observeDuration();
}
@@ -359,6 +379,7 @@ private CompletableFuture> getWithoutRetry(Cid cid) {
public CompletableFuture put(byte[] block, Cid.Codec codec) {
return getWithBackoff(() -> putWithoutRetry(block, codec));
}
+
public CompletableFuture putWithoutRetry(byte[] block, Cid.Codec codec) {
Histogram.Timer writeTimer = writeTimerLog.labels("write").startTimer();
byte[] hash = Hash.sha256(block);
@@ -368,7 +389,7 @@ public CompletableFuture putWithoutRetry(byte[] block, Cid.Codec codec) {
String s3Key = folder + key;
Map extraHeaders = new TreeMap<>();
extraHeaders.put("Content-Type", "application/octet-stream");
- String contentHash = ArrayOps.bytesToHex(hash);
+ String contentHash = ArrayOps.bytesToHex(hash);
PresignedUrl putUrl = S3Request.preSignPut(s3Key, block.length, contentHash, false,
S3AdminRequests.asAwsDate(ZonedDateTime.now()), host, extraHeaders, region, accessKeyId, secretKey, useHttps, hasher).join();
HttpUtil.put(putUrl.base, putUrl.fields, block);
diff --git a/src/main/java/org/peergos/net/APIHandler.java b/src/main/java/org/peergos/net/APIHandler.java
index ad49f8f3..d4f225dd 100755
--- a/src/main/java/org/peergos/net/APIHandler.java
+++ b/src/main/java/org/peergos/net/APIHandler.java
@@ -1,25 +1,39 @@
package org.peergos.net;
+import com.sun.net.httpserver.HttpExchange;
import io.ipfs.cid.Cid;
-import io.ipfs.multihash.*;
+import io.ipfs.multihash.Multihash;
import io.libp2p.core.PeerId;
-import io.libp2p.crypto.keys.*;
-import org.peergos.*;
-import org.peergos.cbor.*;
-import org.peergos.protocol.ipns.*;
-import org.peergos.protocol.ipns.pb.*;
-import org.peergos.util.*;
-import com.sun.net.httpserver.HttpExchange;
+import org.peergos.AggregatedMetrics;
+import org.peergos.EmbeddedIpfs;
+import org.peergos.HashedBlock;
+import org.peergos.PeerAddresses;
+import org.peergos.Want;
+import org.peergos.cbor.CborObject;
+import org.peergos.protocol.ipns.IpnsRecord;
+import org.peergos.protocol.ipns.pb.Ipns;
+import org.peergos.util.ArrayOps;
+import org.peergos.util.HttpUtil;
+import org.peergos.util.JSONParser;
+import org.peergos.util.Logging;
+import org.peergos.util.Version;
-import java.io.*;
-import java.nio.*;
-import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.logging.Logger;
-import java.util.stream.*;
+import java.util.stream.Collectors;
public class APIHandler extends Handler {
public static final String API_URL = "/api/v0/";
- public static final Version CURRENT_VERSION = Version.parse("0.7.9");
+ public static final Version CURRENT_VERSION = Version.parse("0.8.0");
private static final Logger LOG = Logging.LOG();
private static final boolean LOGGING = true;
@@ -52,7 +66,7 @@ public void handleCallToAPI(HttpExchange httpExchange) {
long t1 = System.currentTimeMillis();
String path = httpExchange.getRequestURI().getPath();
try {
- if (! path.startsWith(API_URL))
+ if (!path.startsWith(API_URL))
throw new IllegalStateException("Unsupported api version, required: " + API_URL);
path = path.substring(API_URL.length());
// N.B. URI.getQuery() decodes the query string
@@ -64,7 +78,7 @@ public void handleCallToAPI(HttpExchange httpExchange) {
AggregatedMetrics.API_ID.inc();
PeerId peerId = ipfs.node.getPeerId();
Map res = new HashMap<>();
- res.put("ID", peerId.toBase58());
+ res.put("ID", peerId.toBase58());
replyJson(httpExchange, JSONParser.toString(res));
break;
}
@@ -90,7 +104,7 @@ public void handleCallToAPI(HttpExchange httpExchange) {
.map(a -> Boolean.parseBoolean(a.get(0)))
.orElse(true);
List block = ipfs.getBlocks(List.of(new Want(Cid.decode(args.get(0)), auth)), peers, addToBlockstore);
- if (! block.isEmpty()) {
+ if (!block.isEmpty()) {
replyBytes(httpExchange, block.get(0).block);
} else {
try {
@@ -104,7 +118,7 @@ public void handleCallToAPI(HttpExchange httpExchange) {
case BULK_STAT: {
AggregatedMetrics.API_BLOCK_STAT_BULK.inc();
Map json = (Map) JSONParser.parse(new String(readFully(httpExchange.getRequestBody())));
- List wants = ((List