Skip to content

Unable to run Spark jobs on an Azure Databricks cluster: java.lang.NullPointerException at org.apache.spark.util.LocalHadoopConfiguration.set(SparkHadoopConfiguration.scala:196) #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jochenhebbrecht opened this issue Nov 8, 2021 · 14 comments
Labels
bug spark Spark connector

Comments

@jochenhebbrecht
Copy link

jochenhebbrecht commented Nov 8, 2021

Hi,

Since last Friday, 5th of November, we seem to be unable to use the osm4scala library on an Azure Databricks cluster with configuration: 8.3 (includes Apache Spark 3.1.1, Scala 2.12).

When we try to run this simple command in a notebook:

spark.read
     .format("osm.pbf")
     .load("dbfs:/FileStore/shared_uploads/andorra_latest_osm.pbf")
     .select("id","type")
     .show()

... we're getting following exception:

Caused by: java.lang.NullPointerException
	at org.apache.spark.util.LocalHadoopConfiguration.set(SparkHadoopConfiguration.scala:196)
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1115)
	at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2877)
	at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
	at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
	at org.apache.spark.SerializableWritable.$anonfun$readObject$1(SerializableWritable.scala:45)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1609)
	at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:41)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$unBlockifyObject$4(TorrentBroadcast.scala:372)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
	at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:374)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:279)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:253)
	at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:248)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1609)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:248)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:118)
	at org.apache.spark.broadcast.Broadcast.$anonfun$value$1(Broadcast.scala:80)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)
	at com.acervera.osm4scala.spark.OsmPbfFormat.$anonfun$buildReader$1(OsmPbfFormat.scala:86)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:142)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:331)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:475)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$hasNext$1(FileScanRDD.scala:300)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:295)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:91)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

As we have been running osm4scala so many times and we didn't change anything on our side, we escaled this as a critical support case to Azure.

Unfortunately, this is the feedback we've received:

  • we no longer have the problem if we downgrade to 1.0.6
  • it is worth checking with the creators\support group of this library to see what changes are done post 1.0.6 version. As this is a third party library, we don’t have visibility to the backend configuration for these library.
    (c) Azure Support

If we follow the stacktrace, you'll notice we'll end up at this part of your code:

com.acervera.osm4scala.spark.OsmPbfFormat:86

... which is this line:

val fs = path.getFileSystem(broadcastedHadoopConf.value.value)

Are there any insights what could be causing this very annoying behavior?

It is also very unclear from where the following classes are originating. I would assume those are Azure Databricks specific, but Azure support is not confirming this.

org.apache.spark.util.LocalHadoopConfiguration
org.apache.spark.util.SparkHadoopConfiguration
@jochenhebbrecht
Copy link
Author

jochenhebbrecht commented Nov 8, 2021

Aha, I was able to get it back working again:

  • go to src/main/scala/com/acervera/osm4scala/spark/OsmPbfFormat.scala
  • revert this file to commit id: d4d706b
  • build the entire package: sbt assembly
  • deployed the snapshot version on Azure Databricks

Result is that it started to work again. So I can tell you with 100% confidence we introduced a new bug in this commit: 5f54f8d, but it got recently exposed (due to an underlying Azure Databricks upgrade).

Unfortunately, I cannot really tell you yet what exactly is wrong in OsmPbfFormat.scala

@angelcervera
Copy link
Member

angelcervera commented Nov 9, 2021

Hi @jochenhebbrecht
Which version of osm4scala are you using? Looks line Line 86 is a comment.

I'm going to try to reproduce the error using the last version v1.0.10

BTW, the last time I tested it in Azure was 7 months ago, so the version tested there should be 1.0.7.

@jochenhebbrecht
Copy link
Author

Yes, sorry, we are using 1.0.9 in production. We haven't upgraded to 1.0.10 yet because we still have to revert the workaround with the timestamp. So we're talking about this line. I've tested it with 1.0.10 as well, and the problem remains.

In the meantime, Azure support gave us a possibility to revert to the previous Databricks environment, but according to them, their underlying upgrade actually just exposes a bug on osm4scala and we would need to dive deeper on that one.

@angelcervera
Copy link
Member

angelcervera commented Nov 9, 2021

I was able to reproduce it in Azure.
Bug looks localized at com.acervera.osm4scala.spark.OsmPbfFormat.$anonfun$buildReader$1(OsmPbfFormat.scala:153) that is the equivalent to L86 in other version.

It is weird, because this line is getting the file system, so nothing special. Also, the NPE is triggered in a non reader related class. Looks like it is not broadcasting the hadoop configuration properly.

@vanhove
Copy link

vanhove commented Nov 9, 2021

yes, it looks like the line val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableWritable(hadoopConf)) would need to be reverted to use SerializableConfiguration. (Similarly to the ParquetFileFormat)

I noticed that this change was done related to a bug, but it's not clear what the cause was: #62

@angelcervera
Copy link
Member

angelcervera commented Nov 9, 2021

That fixed a back compatibility problem with Spark 2. So the problem could be a combination of the Spark version and the Hadoop version.
No time now to work on that. I hope to do it this evening.

Could you try to run it using Spark 2.4? Only to know if it is still working there.

@angelcervera
Copy link
Member

In AWS EMR with Hadoop 3.2.1 and Spark 3.1.2, version 1.0.10 is working without problems. Looks like there is something weird in Azure Databrick.

@jochenhebbrecht
Copy link
Author

jochenhebbrecht commented Nov 10, 2021

Thanks Ángel for already having a look at this.

When downgrading to 6.4 Extended Support (includes Apache Spark 2.4.5, Scala 2.11) and installing the osm4scala-spark2-shaded_2.11-1.0.10.jar library, I can no longer reproduce the problem.

We still have the support case open and Azure are also looking further into this problem on their side. I'll provide this feedback as well. It is still perfectly possible something is broken on Azure Databricks side, and that your code is actually perfectly fine.

We are also still waiting for the source code on their side, to get smarter on where the NPE occurs

@angelcervera
Copy link
Member

I will research more using Azure, but I will not have time until the weekend. Also, this type of error requires infrastructure that is not free. Do you know if Microsoft or Databricks would give me resources for testing? That would be great. Could ask them?
Thanks

@jochenhebbrecht
Copy link
Author

That sounds like a very great idea, it's definitely something I would like to achieve. I will contact the support team of Azure and ask whether you could get some capacity on resources to further investigate this bug.

@jochenhebbrecht
Copy link
Author

I cannot reproduce the problem on 8.3 (includes Apache Spark 3.1.1, Scala 2.12) and osm4scala:1.0.10 with cluster mode set to Single Node (instead of Standard). This would indeed learn us the problem is serializing the Hadoop configuration.

@jochenhebbrecht
Copy link
Author

Feedback from Azure

Databricks team have already identified that the issue is caused by a maintenance release. They are currently investigating what change specifically caused the issue and a permanent fix to this issue will be planned for that.

So I would not worry too much on your codebase, let's give the support team some time and freedom to first investigate what they potentially broke.

With regards to asking resources to reproduce it on your end:

Thank you for letting us know about this. During the investigation if we get to a stage where help from third party library owner is required, we would definitely explore this option. Given that the investigation is underway, this is not immediately needed. But we appreciate your suggestion and willingness to collaborate.

I would suggest to close this ticket for now. We can always reopen it in case Azure Databricks would still come back to us indicating something's not valid in the codebase of osm4scala

@angelcervera
Copy link
Member

Hi @jochenhebbrecht Thanks for your update. I will close the ticket, but let's recap to keep it in mind. Maybe it is necessary to change something in the connector related to this, like find a way to avoid serializing the HadoopConfig object.

Tested in cloud providers with Spark 3.

  • AWS EMR multinode cluster worked fine.
  • Azure Databricks single node worked fine.
  • Azure Databricks multinode failed.

Looks like the error is related to Hadoop config serialization. It makes sense because in the single node execution serialization is not needed, so it does not fail.

FYI: I created ticket #103 to be able to test these types of problems quickly.

@angelcervera angelcervera added the spark Spark connector label Nov 10, 2021
@vanhove
Copy link

vanhove commented Nov 17, 2021

Just wanted to confirm that databricks fixed the issue on azure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug spark Spark connector
Projects
None yet
Development

No branches or pull requests

3 participants