From 424ba5a84f0b8e52a0c51c34b9b0a695ae76ca4d Mon Sep 17 00:00:00 2001 From: David Baker Effendi Date: Mon, 22 Jul 2024 12:53:34 +0200 Subject: [PATCH] Added additional profiler, no longer killing after timeouts, improving neo4j resource handling --- .../plume/oss/drivers/Neo4jEmbeddedDriver.scala | 8 ++++---- runBenchmarks.sc | 15 ++++++++------- .../scala/com/github/plume/oss/Benchmark.scala | 8 ++------ .../oss/benchmarking/GraphReadBenchmark.scala | 5 ++++- .../oss/benchmarking/GraphWriteBenchmark.scala | 2 ++ 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/drivers/neo4j-embedded/src/main/scala/com/github/plume/oss/drivers/Neo4jEmbeddedDriver.scala b/drivers/neo4j-embedded/src/main/scala/com/github/plume/oss/drivers/Neo4jEmbeddedDriver.scala index 8e840853..40765144 100644 --- a/drivers/neo4j-embedded/src/main/scala/com/github/plume/oss/drivers/Neo4jEmbeddedDriver.scala +++ b/drivers/neo4j-embedded/src/main/scala/com/github/plume/oss/drivers/Neo4jEmbeddedDriver.scala @@ -138,7 +138,7 @@ final class Neo4jEmbeddedDriver( Try(tx.execute(query, params)) match { case Failure(e) => logger.error(s"Unable to write bulk create node transaction $query", e) - case Success(_) => + case Success(x) => x.close() } } tx.commit() @@ -174,7 +174,7 @@ final class Neo4jEmbeddedDriver( Try(tx.execute(query, params)) match { case Failure(e) => logger.error(s"Unable to write bulk set node property transaction $query", e) - case Success(_) => + case Success(x) => x.close() } } tx.commit() @@ -200,7 +200,7 @@ final class Neo4jEmbeddedDriver( ) match { case Failure(e) => logger.error(s"Unable to write bulk create edge transaction $query", e) - case Success(_) => + case Success(x) => x.close() } } tx.commit() @@ -253,7 +253,7 @@ final class Neo4jEmbeddedDriver( Using.resource(graphDb.beginTx) { tx => val payload = buildSchemaPayload() try { - payload.lines().forEach(line => tx.execute(line)) + payload.lines().forEach(line => tx.execute(line).close()) } catch { case e: Exception => logger.error(s"Unable to set schema: $payload", e) diff --git a/runBenchmarks.sc b/runBenchmarks.sc index 513e85ff..2139523b 100644 --- a/runBenchmarks.sc +++ b/runBenchmarks.sc @@ -84,8 +84,8 @@ def runAndMonitorBenchmarkProcess(cmd: String, writeOutputFile: File, readOutput writeOutputFile.createIfNotExists readOutputFile.createIfNotExists - val processBuilder = new java.lang.ProcessBuilder("sbt", cmd) - .redirectOutput(File(writeOutputFile.getAbsolutePath.stripSuffix("write.txt") + "sbt.txt")) + val sbtFile = File(writeOutputFile.getAbsolutePath.stripSuffix("write.txt") + "sbt.txt") + val processBuilder = new java.lang.ProcessBuilder("sbt", cmd).redirectOutput(sbtFile) // Ignore locks for aborted JMH processes val env = processBuilder.environment @@ -101,10 +101,11 @@ def runAndMonitorBenchmarkProcess(cmd: String, writeOutputFile: File, readOutput try { var line: String = null while ({ line = reader.readLine(); line != null }) { - if (line.contains("benchmark timed out")) { - println("Timeout detected. Sending Ctrl+C signal to process...") - shouldTerminate = true - } else if (line.contains("java.lang.OutOfMemoryError")) { +// if (line.contains("benchmark timed out")) { +// println("Timeout detected. Sending Ctrl+C signal to process...") +// shouldTerminate = true +// } + if (line.contains("java.lang.OutOfMemoryError")) { println("OutOfMemoryError detected. Sending Ctrl+C signal to process...") shouldTerminate = true } @@ -123,7 +124,7 @@ def runAndMonitorBenchmarkProcess(cmd: String, writeOutputFile: File, readOutput var shouldTerminate = false while (!shouldTerminate && process.isAlive) { Thread.sleep(5000) - shouldTerminate = readLogsForErrors(writeOutputFile) || readLogsForErrors(readOutputFile) + shouldTerminate = readLogsForErrors(writeOutputFile) || readLogsForErrors(readOutputFile) || readLogsForErrors(sbtFile) } } diff --git a/src/main/scala/com/github/plume/oss/Benchmark.scala b/src/main/scala/com/github/plume/oss/Benchmark.scala index 46bb1088..7594f7d4 100644 --- a/src/main/scala/com/github/plume/oss/Benchmark.scala +++ b/src/main/scala/com/github/plume/oss/Benchmark.scala @@ -9,7 +9,7 @@ import com.github.plume.oss.benchmarking.{ TinkerGraphReadBenchmark } import com.github.plume.oss.drivers.{IDriver, TinkerGraphDriver} -import org.cache2k.benchmark.jmh.HeapProfiler +import org.cache2k.benchmark.jmh.{HeapProfiler, LinuxVmProfiler} import org.openjdk.jmh.annotations.Mode import org.openjdk.jmh.runner.Runner import org.openjdk.jmh.runner.options.{ChainedOptionsBuilder, OptionsBuilder, TimeValue} @@ -24,7 +24,6 @@ object Benchmark { .foreach { config => val writeOptsBenchmark = createOptionsBoilerPlate(config, WRITE) .include(classOf[GraphWriteBenchmark].getSimpleName) - .warmupIterations(5) .build() new Runner(writeOptsBenchmark).run() println( @@ -36,21 +35,18 @@ object Benchmark { Option( createOptionsBoilerPlate(config, READ) .include(classOf[TinkerGraphReadBenchmark].getSimpleName) - .warmupIterations(1) .build() ) case _: OverflowDbConfig => Option( createOptionsBoilerPlate(config, READ) .include(classOf[OverflowDbReadBenchmark].getSimpleName) - .warmupIterations(1) .build() ) case _: Neo4jEmbeddedConfig => Option( createOptionsBoilerPlate(config, READ) .include(classOf[Neo4jEmbedReadBenchmark].getSimpleName) - .warmupIterations(1) .build() ) case x => @@ -70,8 +66,8 @@ object Benchmark { private def createOptionsBoilerPlate(config: PlumeConfig, benchmarkType: BenchmarkType): ChainedOptionsBuilder = { new OptionsBuilder() .addProfiler(classOf[HeapProfiler]) + .addProfiler(classOf[LinuxVmProfiler]) .warmupTime(TimeValue.seconds(30)) - .measurementIterations(3) .mode(Mode.AverageTime) .forks(1) .output(s"${config.jmhOutputFile}-${benchmarkType.toString.toLowerCase}.txt") diff --git a/src/main/scala/com/github/plume/oss/benchmarking/GraphReadBenchmark.scala b/src/main/scala/com/github/plume/oss/benchmarking/GraphReadBenchmark.scala index 1690b28e..0b4fd27f 100644 --- a/src/main/scala/com/github/plume/oss/benchmarking/GraphReadBenchmark.scala +++ b/src/main/scala/com/github/plume/oss/benchmarking/GraphReadBenchmark.scala @@ -15,7 +15,8 @@ import org.openjdk.jmh.annotations.{ Setup, State, TearDown, - Timeout + Timeout, + Warmup } import org.openjdk.jmh.infra.{BenchmarkParams, Blackhole} @@ -25,6 +26,8 @@ import scala.compiletime.uninitialized @State(Scope.Benchmark) @Timeout(5, TimeUnit.MINUTES) @OutputTimeUnit(TimeUnit.MILLISECONDS) +@Measurement(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS) +@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) trait GraphReadBenchmark { @Param(Array("")) diff --git a/src/main/scala/com/github/plume/oss/benchmarking/GraphWriteBenchmark.scala b/src/main/scala/com/github/plume/oss/benchmarking/GraphWriteBenchmark.scala index 45021026..811df522 100644 --- a/src/main/scala/com/github/plume/oss/benchmarking/GraphWriteBenchmark.scala +++ b/src/main/scala/com/github/plume/oss/benchmarking/GraphWriteBenchmark.scala @@ -13,6 +13,8 @@ import scala.compiletime.uninitialized @State(Scope.Benchmark) @Timeout(6, TimeUnit.MINUTES) @OutputTimeUnit(TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 2, timeUnit = TimeUnit.MINUTES) +@Warmup(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS) class GraphWriteBenchmark { @Param(Array(""))