Skip to content

Commit

Permalink
Merge pull request Peergos#101 from Peergos/fix/p2p-http-memory-leak
Browse files Browse the repository at this point in the history
Fix p2p http memory leak
  • Loading branch information
ianopolous authored Oct 7, 2024
2 parents d9018ae + 0290ddd commit c3274ed
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 9 deletions.
22 changes: 14 additions & 8 deletions src/main/java/org/peergos/protocol/http/HttpProtocol.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package org.peergos.protocol.http;

import io.ipfs.cid.*;
import io.ipfs.multibase.*;
import io.ipfs.multihash.*;
import io.libp2p.core.*;
import io.libp2p.core.multistream.*;
import io.libp2p.protocol.*;
Expand Down Expand Up @@ -46,9 +43,11 @@ public Sender(Stream stream) {
@Override
public void onMessage(@NotNull Stream stream, FullHttpResponse msg) {
CompletableFuture<FullHttpResponse> req = queue.poll();
if (req != null)
req.complete(msg.copy());
msg.release();
if (req != null) {
req.complete(msg.retain());
} else {
msg.release();
}
stream.close();
}

Expand Down Expand Up @@ -86,7 +85,7 @@ public Receiver(HttpRequestProcessor requestHandler) {
}

private void sendReply(HttpContent reply, Stream p2pstream) {
p2pstream.writeAndFlush(reply.copy());
p2pstream.writeAndFlush(reply.retain());
}

@Override
Expand Down Expand Up @@ -120,7 +119,14 @@ protected void initChannel(Channel ch) throws Exception {
Channel ch = fut.channel();

FullHttpRequest retained = msg.retain();
fut.addListener(x -> ch.writeAndFlush(retained));
fut.addListener(x -> {
if (x.isSuccess())
ch.writeAndFlush(retained).addListener(f -> {
retained.release();
});
else
retained.release();
});
}

private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; // This is the total inbound or outbound traffic allowed, not a rate
Expand Down
64 changes: 63 additions & 1 deletion src/test/java/org/peergos/EmbeddedIpfsTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.peergos;

import com.sun.net.httpserver.HttpServer;
import identify.pb.*;
import io.ipfs.cid.*;
import io.ipfs.multiaddr.*;
Expand All @@ -9,12 +10,22 @@
import io.libp2p.core.multiformats.*;
import io.libp2p.crypto.keys.*;
import io.libp2p.protocol.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import org.junit.*;
import org.peergos.blockstore.*;
import org.peergos.config.*;
import org.peergos.protocol.dht.*;
import org.peergos.protocol.http.HttpProtocol;
import org.peergos.protocol.ipns.*;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
Expand Down Expand Up @@ -43,6 +54,53 @@ public void largeBlock() throws Exception {
node2.stop();
}

@Test
public void largeWrite() throws Exception {
System.setProperty("io.netty.leakDetection.level", "advanced");
// Start proxy target
InetSocketAddress proxyTarget = new InetSocketAddress("localhost", 7777);
HttpServer target = HttpServer.create(proxyTarget, 20);
String reply = "AllGood";
byte[] replyBytes = reply.getBytes();
target.createContext("/", ex -> {
ex.sendResponseHeaders(200, replyBytes.length);
OutputStream out = ex.getResponseBody();
out.write(replyBytes);
out.flush();
out.close();
});
target.start();

HttpProtocol.HttpRequestProcessor http1 = (s, req, h) -> HttpProtocol.proxyRequest(req, proxyTarget, h);
EmbeddedIpfs node1 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http1));
node1.start();

HttpProtocol.HttpRequestProcessor http2 = (s, req, h) -> HttpProtocol.proxyRequest(req, new InetSocketAddress("localhost", 7778), h);
EmbeddedIpfs node2 = build(node1.node.listenAddresses()
.stream()
.map(a -> new MultiAddress(a.toString()))
.collect(Collectors.toList()), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http2));
node2.start();

for (int i = 0; i < 1000; i++) {
ByteBuf largeBody = Unpooled.buffer(2 * 1024 * 1024);
DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/hey", largeBody);
HttpProtocol.HttpController http = node2.p2pHttp.get().dial(node2.node, node1.node.getPeerId(), node1.node.listenAddresses().toArray(Multiaddr[]::new))
.getController().join();
FullHttpResponse resp = http.send(req).join();
ByteArrayOutputStream bout = new ByteArrayOutputStream();
int contentLength = resp.headers().getInt("content-length");
resp.content().readBytes(bout, contentLength);
byte[] body = bout.toByteArray();
Assert.assertTrue("Correct response", Arrays.equals(body, replyBytes));
resp.release();
resp.release();
}

node1.stop();
node2.stop();
}

@Test
public void mdnsDiscovery() throws Exception {
EmbeddedIpfs node1 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())));
Expand Down Expand Up @@ -138,12 +196,16 @@ public void wildcardListenerAddressesGetExpanded() {
}

public static EmbeddedIpfs build(List<MultiAddress> bootstrap, List<MultiAddress> swarmAddresses) {
return build(bootstrap, swarmAddresses, Optional.empty());
}

public static EmbeddedIpfs build(List<MultiAddress> bootstrap, List<MultiAddress> swarmAddresses, Optional<HttpProtocol.HttpRequestProcessor> http) {
BlockRequestAuthoriser blockRequestAuthoriser = (c, p, a) -> CompletableFuture.completedFuture(true);
HostBuilder builder = new HostBuilder().generateIdentity();
PrivKey privKey = builder.getPrivateKey();
PeerId peerId = builder.getPeerId();
IdentitySection id = new IdentitySection(privKey.bytes(), peerId);
return EmbeddedIpfs.build(new RamRecordStore(), new RamBlockstore(), true, swarmAddresses, bootstrap,
id, blockRequestAuthoriser, Optional.empty());
id, blockRequestAuthoriser, http);
}
}

0 comments on commit c3274ed

Please sign in to comment.