Skip to content

Commit e22a095

Browse files
committed
[KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0
There were some breaking changes after we fixed compatibility for Spark 4.0.0 RC1 in #6920, but now Spark has reached 4.0.0 RC6, which has less chance to receive more breaking changes. Changes are extracted from #6928, which passed CI with Spark 4.0.0 RC6 No. Closes #7061 from pan3793/6920-followup. Closes #6920 17a1bd9 [Cheng Pan] [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0 Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 215994a commit e22a095

File tree

3 files changed

+224
-51
lines changed

3 files changed

+224
-51
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,19 @@
1818
package org.apache.kyuubi.spark.connector.hive
1919

2020
import java.lang.{Boolean => JBoolean, Long => JLong}
21+
import java.net.URI
2122

2223
import scala.util.Try
2324

2425
import org.apache.hadoop.fs.{FileStatus, Path}
2526
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
2627
import org.apache.spark.internal.Logging
2728
import org.apache.spark.sql.SparkSession
28-
import org.apache.spark.sql.catalyst.InternalRow
29+
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2930
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
3031
import org.apache.spark.sql.connector.catalog.TableChange
3132
import org.apache.spark.sql.connector.catalog.TableChange._
3233
import org.apache.spark.sql.execution.command.CommandUtils
33-
import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize}
3434
import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile}
3535
import org.apache.spark.sql.hive.execution.HiveFileFormat
3636
import org.apache.spark.sql.internal.SQLConf
@@ -82,7 +82,28 @@ object HiveConnectorUtils extends Logging {
8282
isSplitable: JBoolean,
8383
maxSplitBytes: JLong,
8484
partitionValues: InternalRow): Seq[PartitionedFile] =
85-
Try { // SPARK-42821: 4.0.0-preview2
85+
Try { // SPARK-42821, SPARK-51185: Spark 4.0
86+
val fileStatusWithMetadataClz = DynClasses.builder()
87+
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
88+
.buildChecked()
89+
DynMethods
90+
.builder("splitFiles")
91+
.impl(
92+
"org.apache.spark.sql.execution.PartitionedFileUtil",
93+
fileStatusWithMetadataClz,
94+
classOf[Path],
95+
classOf[Boolean],
96+
classOf[Long],
97+
classOf[InternalRow])
98+
.buildChecked()
99+
.invokeChecked[Seq[PartitionedFile]](
100+
null,
101+
file,
102+
filePath,
103+
isSplitable,
104+
maxSplitBytes,
105+
partitionValues)
106+
}.recover { case _: Exception => // SPARK-42821: 4.0.0-preview2
86107
val fileStatusWithMetadataClz = DynClasses.builder()
87108
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
88109
.buildChecked()
@@ -192,19 +213,41 @@ object HiveConnectorUtils extends Logging {
192213
file.asInstanceOf[FileStatus].getPath
193214
}.get
194215

216+
private def calculateMultipleLocationSizes(
217+
sparkSession: SparkSession,
218+
tid: TableIdentifier,
219+
paths: Seq[Option[URI]]): Seq[Long] = {
220+
221+
val sparkSessionClz = DynClasses.builder()
222+
.impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
223+
.impl("org.apache.spark.sql.SparkSession")
224+
.build()
225+
226+
val calculateMultipleLocationSizesMethod =
227+
DynMethods.builder("calculateMultipleLocationSizes")
228+
.impl(
229+
CommandUtils.getClass,
230+
sparkSessionClz,
231+
classOf[TableIdentifier],
232+
classOf[Seq[Option[URI]]])
233+
.buildChecked(CommandUtils)
234+
235+
calculateMultipleLocationSizesMethod
236+
.invokeChecked[Seq[Long]](sparkSession, tid, paths)
237+
}
238+
195239
def calculateTotalSize(
196240
spark: SparkSession,
197241
catalogTable: CatalogTable,
198242
hiveTableCatalog: HiveTableCatalog): (BigInt, Seq[CatalogTablePartition]) = {
199243
val sessionState = spark.sessionState
200244
val startTime = System.nanoTime()
201245
val (totalSize, newPartitions) = if (catalogTable.partitionColumnNames.isEmpty) {
202-
(
203-
calculateSingleLocationSize(
204-
sessionState,
205-
catalogTable.identifier,
206-
catalogTable.storage.locationUri),
207-
Seq())
246+
val tableSize = CommandUtils.calculateSingleLocationSize(
247+
sessionState,
248+
catalogTable.identifier,
249+
catalogTable.storage.locationUri)
250+
(tableSize, Seq())
208251
} else {
209252
// Calculate table size as a sum of the visible partitions. See SPARK-21079
210253
val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier)
@@ -402,7 +445,13 @@ object HiveConnectorUtils extends Logging {
402445
new StructType(newFields)
403446
}
404447

405-
def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
448+
// This is a fork of Spark's withSQLConf, and we use a different name to avoid linkage
449+
// issue on cross-version cases.
450+
// For example, SPARK-46227(4.0.0) moves `withSQLConf` from SQLHelper to SQLConfHelper,
451+
// classes that extend SQLConfHelper will prefer to linkage super class's method when
452+
// compiling with Spark 4.0, then linkage error will happen when run the jar with lower
453+
// Spark versions.
454+
def withSparkSQLConf[T](pairs: (String, String)*)(f: => T): T = {
406455
val conf = SQLConf.get
407456
val (keys, values) = pairs.unzip
408457
val currentValues = keys.map { key =>

0 commit comments

Comments
 (0)