Skip to content

Commit bc36a7d

Browse files
bersprocketspeter-toth
authored andcommitted
[SPARK-53275][SQL] Handle stateful expressions when ordering in interpreted mode
### What changes were proposed in this pull request? This PR updates `InterpretedOrdering` to use a different copy of stateful expressions when evaluating the two input rows. ### Why are the changes needed? Consider these spark-shell commands: ``` # for this particular example, the bug is exercised when there are 2 executors bin/spark-shell --master "local[2]" import org.apache.spark.sql.functions.udf spark.udf.register("udf", (s: String) => s) Seq((0, "2"), (0, "1")).toDF("a", "b").createOrReplaceTempView("v1") // return a correct result: Array([0,1], [0,2]) sql("select a, udf(b) from v1 order by a, udf(b) asc").collect // run in interpreted mode sql("set spark.sql.codegen.factoryMode=NO_CODEGEN") // return an incorrect result: Array([0,2], [0,1]) sql("select a, udf(b) from v1 order by a, udf(b) asc").collect ``` This is because the `ScalaUDF` expression indirectly holds an UnsafeRow as a buffer (via a serializer, which holds an `UnsafeProjection`, which holds the `UnsafeRow` buffer). When the udf is evaluated for the first row, the resulting `UTF8String` uses the `UnsafeRow`'s base object as its own base object. When the udf is evaluated for the second row, that same base object is updated such that both `UTF8String` objects contain the same string value. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52028 from bersprockets/ordering_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com>
1 parent c13c10f commit bc36a7d

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class BaseOrdering extends Ordering[InternalRow] {
3838
* An interpreted row ordering comparator.
3939
*/
4040
class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering {
41+
private val leftEvaluators = ordering.map(_.child)
42+
private val rightEvaluators = leftEvaluators.map(_.freshCopyIfContainsStatefulExpression())
4143
private lazy val physicalDataTypes = ordering.map { order =>
4244
val dt = order.dataType match {
4345
case udt: UserDefinedType[_] => udt.sqlType
@@ -54,8 +56,8 @@ class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering {
5456
val size = ordering.size
5557
while (i < size) {
5658
val order = ordering(i)
57-
val left = order.child.eval(a)
58-
val right = order.child.eval(b)
59+
val left = leftEvaluators(i).eval(a)
60+
val right = rightEvaluators(i).eval(b)
5961

6062
if (left == null && right == null) {
6163
// Both null, continue looking.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import org.apache.spark.serializer.KryoSerializer
2424
import org.apache.spark.sql.{RandomDataGenerator, Row}
2525
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2626
import org.apache.spark.sql.catalyst.dsl.expressions._
27+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2728
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, GenerateOrdering, LazilyGeneratedOrdering}
29+
import org.apache.spark.sql.internal.SQLConf
2830
import org.apache.spark.sql.types._
2931
import org.apache.spark.util.ArrayImplicits._
3032

@@ -166,4 +168,24 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
166168
GenerateOrdering.genComparisons(ctx, schema)
167169
assert(ctx.INPUT_ROW == null)
168170
}
171+
172+
test("SPARK-53275: ordering by stateful expressions in interpreted mode") {
173+
// even though we explicitly create an InterpretedOrdering below, we still need
174+
// to set CODEGEN_FACTORY_MODE to NO_CODEGEN because the ScalaUDF expression will
175+
// indirectly create an UnsafeProjection, and we want that UnsafeProjection to be
176+
// an InterpretedUnsafeProjection
177+
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.NO_CODEGEN.toString) {
178+
val udfFunc = (s: String) => s
179+
val stringUdf = ScalaUDF(udfFunc, StringType, BoundReference(0, StringType, true) :: Nil,
180+
Option(ExpressionEncoder[String]().resolveAndBind()) :: Nil,
181+
Some(ExpressionEncoder[String]().resolveAndBind()))
182+
val sortOrder = Seq(SortOrder(stringUdf, Ascending))
183+
val rowOrdering = new InterpretedOrdering(sortOrder)
184+
val rowType = StructType(StructField("col1", StringType, nullable = true) :: Nil)
185+
val toCatalyst = CatalystTypeConverters.createToCatalystConverter(rowType)
186+
val rowB1 = toCatalyst(Row("B")).asInstanceOf[InternalRow]
187+
val rowB2 = toCatalyst(Row("A")).asInstanceOf[InternalRow]
188+
assert(rowOrdering.compare(rowB1, rowB2) > 0)
189+
}
190+
}
169191
}

0 commit comments

Comments
 (0)