Skip to content

Commit e389a1c

Browse files
committed
[KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0
### Why are the changes needed? There were some breaking changes after we fixed compatibility for Spark 4.0.0 RC1 in #6920, but now Spark has reached 4.0.0 RC6, which has less chance to receive more breaking changes. ### How was this patch tested? Changes are extracted from #6928, which passed CI with Spark 4.0.0 RC6 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7061 from pan3793/6920-followup. Closes #6920 17a1bd9 [Cheng Pan] [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0 Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit e366b09) Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 7bff922 commit e389a1c

File tree

5 files changed

+239
-21
lines changed

5 files changed

+239
-21
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,19 @@
1818
package org.apache.kyuubi.spark.connector.hive
1919

2020
import java.lang.{Boolean => JBoolean, Long => JLong}
21+
import java.net.URI
2122

2223
import scala.util.Try
2324

2425
import org.apache.hadoop.fs.{FileStatus, Path}
2526
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
2627
import org.apache.spark.internal.Logging
2728
import org.apache.spark.sql.SparkSession
28-
import org.apache.spark.sql.catalyst.InternalRow
29+
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2930
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
3031
import org.apache.spark.sql.connector.catalog.TableChange
3132
import org.apache.spark.sql.connector.catalog.TableChange._
3233
import org.apache.spark.sql.execution.command.CommandUtils
33-
import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize}
3434
import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile}
3535
import org.apache.spark.sql.hive.execution.HiveFileFormat
3636
import org.apache.spark.sql.internal.SQLConf
@@ -82,7 +82,28 @@ object HiveConnectorUtils extends Logging {
8282
isSplitable: JBoolean,
8383
maxSplitBytes: JLong,
8484
partitionValues: InternalRow): Seq[PartitionedFile] =
85-
Try { // SPARK-42821: 4.0.0-preview2
85+
Try { // SPARK-42821, SPARK-51185: Spark 4.0
86+
val fileStatusWithMetadataClz = DynClasses.builder()
87+
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
88+
.buildChecked()
89+
DynMethods
90+
.builder("splitFiles")
91+
.impl(
92+
"org.apache.spark.sql.execution.PartitionedFileUtil",
93+
fileStatusWithMetadataClz,
94+
classOf[Path],
95+
classOf[Boolean],
96+
classOf[Long],
97+
classOf[InternalRow])
98+
.buildChecked()
99+
.invokeChecked[Seq[PartitionedFile]](
100+
null,
101+
file,
102+
filePath,
103+
isSplitable,
104+
maxSplitBytes,
105+
partitionValues)
106+
}.recover { case _: Exception => // SPARK-42821: 4.0.0-preview2
86107
val fileStatusWithMetadataClz = DynClasses.builder()
87108
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
88109
.buildChecked()
@@ -192,19 +213,41 @@ object HiveConnectorUtils extends Logging {
192213
file.asInstanceOf[FileStatus].getPath
193214
}.get
194215

216+
private def calculateMultipleLocationSizes(
217+
sparkSession: SparkSession,
218+
tid: TableIdentifier,
219+
paths: Seq[Option[URI]]): Seq[Long] = {
220+
221+
val sparkSessionClz = DynClasses.builder()
222+
.impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
223+
.impl("org.apache.spark.sql.SparkSession")
224+
.build()
225+
226+
val calculateMultipleLocationSizesMethod =
227+
DynMethods.builder("calculateMultipleLocationSizes")
228+
.impl(
229+
CommandUtils.getClass,
230+
sparkSessionClz,
231+
classOf[TableIdentifier],
232+
classOf[Seq[Option[URI]]])
233+
.buildChecked(CommandUtils)
234+
235+
calculateMultipleLocationSizesMethod
236+
.invokeChecked[Seq[Long]](sparkSession, tid, paths)
237+
}
238+
195239
def calculateTotalSize(
196240
spark: SparkSession,
197241
catalogTable: CatalogTable,
198242
hiveTableCatalog: HiveTableCatalog): (BigInt, Seq[CatalogTablePartition]) = {
199243
val sessionState = spark.sessionState
200244
val startTime = System.nanoTime()
201245
val (totalSize, newPartitions) = if (catalogTable.partitionColumnNames.isEmpty) {
202-
(
203-
calculateSingleLocationSize(
204-
sessionState,
205-
catalogTable.identifier,
206-
catalogTable.storage.locationUri),
207-
Seq())
246+
val tableSize = CommandUtils.calculateSingleLocationSize(
247+
sessionState,
248+
catalogTable.identifier,
249+
catalogTable.storage.locationUri)
250+
(tableSize, Seq())
208251
} else {
209252
// Calculate table size as a sum of the visible partitions. See SPARK-21079
210253
val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier)

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala

Lines changed: 143 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,19 @@
1717

1818
package org.apache.kyuubi.spark.connector.hive
1919

20+
import java.lang.{Boolean => JBoolean, Long => JLong}
2021
import java.net.URI
2122
import java.util
2223

2324
import scala.collection.JavaConverters._
2425
import scala.collection.mutable
26+
import scala.util.Try
2527

2628
import org.apache.hadoop.conf.Configuration
2729
import org.apache.spark.SparkConf
2830
import org.apache.spark.internal.Logging
2931
import org.apache.spark.sql.SparkSession
30-
import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
32+
import org.apache.spark.sql.catalyst.{CurrentUserContext, SQLConfHelper, TableIdentifier}
3133
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
3234
import org.apache.spark.sql.catalyst.catalog._
3335
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -47,6 +49,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
4749
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
4850
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
4951
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
52+
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors}
5053

5154
/**
5255
* A [[TableCatalog]] that wrap HiveExternalCatalog to as V2 CatalogPlugin instance to access Hive.
@@ -100,6 +103,20 @@ class HiveTableCatalog(sparkSession: SparkSession)
100103
catalogName
101104
}
102105

106+
private def newHiveMetastoreCatalog(sparkSession: SparkSession): HiveMetastoreCatalog = {
107+
val sparkSessionClz = DynClasses.builder()
108+
.impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
109+
.impl("org.apache.spark.sql.SparkSession")
110+
.buildChecked()
111+
112+
val hiveMetastoreCatalogCtor =
113+
DynConstructors.builder()
114+
.impl("org.apache.spark.sql.hive.HiveMetastoreCatalog", sparkSessionClz)
115+
.buildChecked[HiveMetastoreCatalog]()
116+
117+
hiveMetastoreCatalogCtor.newInstanceChecked(sparkSession)
118+
}
119+
103120
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
104121
assert(catalogName == null, "The Hive table catalog is already initialed.")
105122
assert(
@@ -110,7 +127,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
110127
catalog = new HiveSessionCatalog(
111128
externalCatalogBuilder = () => externalCatalog,
112129
globalTempViewManagerBuilder = () => globalTempViewManager,
113-
metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
130+
metastoreCatalog = newHiveMetastoreCatalog(sparkSession),
114131
functionRegistry = sessionState.functionRegistry,
115132
tableFunctionRegistry = sessionState.tableFunctionRegistry,
116133
hadoopConf = hadoopConf,
@@ -166,6 +183,129 @@ class HiveTableCatalog(sparkSession: SparkSession)
166183
HiveTable(sparkSession, catalog.getTableMetadata(ident.asTableIdentifier), this)
167184
}
168185

186+
// scalastyle:off
187+
private def newCatalogTable(
188+
identifier: TableIdentifier,
189+
tableType: CatalogTableType,
190+
storage: CatalogStorageFormat,
191+
schema: StructType,
192+
provider: Option[String] = None,
193+
partitionColumnNames: Seq[String] = Seq.empty,
194+
bucketSpec: Option[BucketSpec] = None,
195+
owner: String = Option(CurrentUserContext.CURRENT_USER.get()).getOrElse(""),
196+
createTime: JLong = System.currentTimeMillis,
197+
lastAccessTime: JLong = -1,
198+
createVersion: String = "",
199+
properties: Map[String, String] = Map.empty,
200+
stats: Option[CatalogStatistics] = None,
201+
viewText: Option[String] = None,
202+
comment: Option[String] = None,
203+
collation: Option[String] = None,
204+
unsupportedFeatures: Seq[String] = Seq.empty,
205+
tracksPartitionsInCatalog: JBoolean = false,
206+
schemaPreservesCase: JBoolean = true,
207+
ignoredProperties: Map[String, String] = Map.empty,
208+
viewOriginalText: Option[String] = None): CatalogTable = {
209+
// scalastyle:on
210+
Try { // SPARK-50675 (4.0.0)
211+
DynConstructors.builder()
212+
.impl(
213+
classOf[CatalogTable],
214+
classOf[TableIdentifier],
215+
classOf[CatalogTableType],
216+
classOf[CatalogStorageFormat],
217+
classOf[StructType],
218+
classOf[Option[String]],
219+
classOf[Seq[String]],
220+
classOf[Option[BucketSpec]],
221+
classOf[String],
222+
classOf[Long],
223+
classOf[Long],
224+
classOf[String],
225+
classOf[Map[String, String]],
226+
classOf[Option[CatalogStatistics]],
227+
classOf[Option[String]],
228+
classOf[Option[String]],
229+
classOf[Option[String]],
230+
classOf[Seq[String]],
231+
classOf[Boolean],
232+
classOf[Boolean],
233+
classOf[Map[String, String]],
234+
classOf[Option[String]])
235+
.buildChecked()
236+
.invokeChecked[CatalogTable](
237+
null,
238+
identifier,
239+
tableType,
240+
storage,
241+
schema,
242+
provider,
243+
partitionColumnNames,
244+
bucketSpec,
245+
owner,
246+
createTime,
247+
lastAccessTime,
248+
createVersion,
249+
properties,
250+
stats,
251+
viewText,
252+
comment,
253+
collation,
254+
unsupportedFeatures,
255+
tracksPartitionsInCatalog,
256+
schemaPreservesCase,
257+
ignoredProperties,
258+
viewOriginalText)
259+
}.recover { case _: Exception => // Spark 3.5 and previous
260+
DynConstructors.builder()
261+
.impl(
262+
classOf[CatalogTable],
263+
classOf[TableIdentifier],
264+
classOf[CatalogTableType],
265+
classOf[CatalogStorageFormat],
266+
classOf[StructType],
267+
classOf[Option[String]],
268+
classOf[Seq[String]],
269+
classOf[Option[BucketSpec]],
270+
classOf[String],
271+
classOf[Long],
272+
classOf[Long],
273+
classOf[String],
274+
classOf[Map[String, String]],
275+
classOf[Option[CatalogStatistics]],
276+
classOf[Option[String]],
277+
classOf[Option[String]],
278+
classOf[Seq[String]],
279+
classOf[Boolean],
280+
classOf[Boolean],
281+
classOf[Map[String, String]],
282+
classOf[Option[String]])
283+
.buildChecked()
284+
.invokeChecked[CatalogTable](
285+
null,
286+
identifier,
287+
tableType,
288+
storage,
289+
schema,
290+
provider,
291+
partitionColumnNames,
292+
bucketSpec,
293+
owner,
294+
createTime,
295+
lastAccessTime,
296+
createVersion,
297+
properties,
298+
stats,
299+
viewText,
300+
comment,
301+
unsupportedFeatures,
302+
tracksPartitionsInCatalog,
303+
schemaPreservesCase,
304+
ignoredProperties,
305+
viewOriginalText)
306+
}.get
307+
}
308+
169309
override def createTable(
170310
ident: Identifier,
171311
schema: StructType,
@@ -190,7 +330,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
190330
CatalogTableType.MANAGED
191331
}
192332

193-
val tableDesc = CatalogTable(
333+
val tableDesc = newCatalogTable(
194334
identifier = ident.asTableIdentifier,
195335
tableType = tableType,
196336
storage = storage,
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.SparkSession
21+
22+
import org.apache.kyuubi.util.reflect.DynMethods
23+
24+
object SparkPlanHelper {
25+
26+
private val sparkSessionMethod = DynMethods.builder("session")
27+
.impl(classOf[SparkPlan])
28+
.buildChecked()
29+
30+
def sparkSession(sparkPlan: SparkPlan): SparkSession = {
31+
sparkSessionMethod.invokeChecked[SparkSession](sparkPlan)
32+
}
33+
}

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.TaskContext
3232
import org.apache.spark.internal.Logging
3333
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
3434
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
35-
import org.apache.spark.sql.execution.CollectLimitExec
35+
import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlanHelper}
3636
import org.apache.spark.sql.types._
3737
import org.apache.spark.sql.util.ArrowUtils
3838
import org.apache.spark.util.Utils
@@ -157,7 +157,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
157157
val partsToScan =
158158
partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))
159159

160-
val sc = collectLimitExec.session.sparkContext
160+
val sc = SparkPlanHelper.sparkSession(collectLimitExec).sparkContext
161161
val res = sc.runJob(
162162
childRDD,
163163
(it: Iterator[InternalRow]) => {

0 commit comments

Comments
 (0)