-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8664] adapt TestSparkSqlCoreFlow for hudi stream API #12602
base: master
Are you sure you want to change the base?
[HUDI-8664] adapt TestSparkSqlCoreFlow for hudi stream API #12602
Conversation
c9c705c
to
f77c625
Compare
@@ -76,14 +76,22 @@ public static List<String> listCommitsSince(HoodieStorage storage, String basePa | |||
|
|||
// this is used in the integration test script: docker/demo/sparksql-incremental.commands | |||
public static List<String> listCompletionTimeSince(FileSystem fs, String basePath, | |||
String instantTimestamp) { | |||
String instantTimestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this method use listCompletedInstantSince
(to return Stream<HoodieInstant>
) to avoid code duplication on the same logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* Returns the last successful write operation's completed instant. | ||
*/ | ||
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) | ||
public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem fs, String basePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here on extracting the common functionality with latestCommit
(two methods above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -30,16 +30,15 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings | |||
import org.apache.hudi.hadoop.fs.HadoopFSUtils | |||
import org.apache.hudi.keygen.NonpartitionedKeyGenerator | |||
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient | |||
import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils} | |||
|
|||
import org.apache.hudi.DataSourceReadOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: keep import grouping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { | |||
val dataGen = new HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 0xDEED) | |||
|
|||
//Bulk insert first set of records | |||
val inputDf0 = generateInserts(dataGen, "000", 100).cache() | |||
val inputDf0 = generateInserts(dataGen, "000", 10).cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the record count the same as before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* Returns the last successful write operation's completed instant. | ||
*/ | ||
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) | ||
public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem fs, String basePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem fs, String basePath) { | |
public static HoodieInstant latestCompletedCommit(FileSystem fs, String basePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return timeline.lastInstant().get(); | ||
} | ||
|
||
public static HoodieInstant latestCompletedCommitCompletionTime(HoodieStorage storage, String basePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static HoodieInstant latestCompletedCommitCompletionTime(HoodieStorage storage, String basePath) { | |
public static HoodieInstant latestCompletedCommit(HoodieStorage storage, String basePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
assertEquals(100, snapshotDf2.count()) | ||
compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1) | ||
snapshotDf2.unpersist(true) | ||
val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, tableBasePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, tableBasePath) | |
val commitInstant2 = latestCompletedCommitCompletionTime(fs, tableBasePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done for all
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { | |||
val dataGen = new HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 0xDEED) | |||
|
|||
//Bulk insert first set of records | |||
val inputDf0 = generateInserts(dataGen, "000", 100).cache() | |||
val inputDf0 = generateInserts(dataGen, "000", 10).cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for other places
val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count() | ||
insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 3) | ||
val commitInstantTime3 = latestCommit(fs, tableBasePath) | ||
val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, tableBasePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, tableBasePath) | |
val commitInstant3 = latestCompletedCommitCompletionTime(fs, tableBasePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
// Read Incremental Query, uses hudi_table_changes() table valued function for spark sql | ||
// we have 2 commits, try pulling the first commit (which is not the latest) | ||
//HUDI-5266 | ||
val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0) | ||
val firstCommitInstant = listCompletedInstantSince(fs, tableBasePath, "000").get(0) | ||
val firstCommit = firstCommitInstant.getCompletionTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val firstCommit = firstCommitInstant.getCompletionTime | |
val firstCommitCompletionTime = firstCommitInstant.getCompletionTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure all variables have consistent naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap | ||
|
||
// Check that all input rows exist in hudiRows | ||
inputRows.foreach { inputRow => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just sort the list by record key and do row comparison? Will that code be easier to understand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking the same but it is not the case:
we need to keep 3 idx for the 3 arrays, search in both inputRows and beforeRows for each row in hudiRows. Also need to handle various cases where the key cannot be found in total it leads to ~100 lines of code.
I can do that if required, the current one is the most concise one (but not the most efficient one since we are just handling couple of hundred rows)
val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from hudiTbl") | ||
val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from inputTbl") | ||
val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from beforeTbl") | ||
def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], beforeRows: Array[Row]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this achieves almost the same functionality as compareUpdateRowsWithHudiRows
? Could we keep one of them only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do that, it will requires refactoring other consumers as well
@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { | |||
assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0) | |||
} | |||
|
|||
private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row], timeTravelDfRows: Array[Row]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row], timeTravelDfRows: Array[Row]): Unit = { | |
private def compareEntireInputRowsWithHudiRows(expectedRows: Array[Row], actualRows: Array[Row]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
def compareUpdateRowsWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], beforeRows: Array[Row]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def compareUpdateRowsWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], beforeRows: Array[Row]): Unit = { | |
def compareUpdateRowsWithHudiRows(expectedRows: Array[Row], actualUpdateRows: Array[Row], actualRows: Array[Row]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you name them properly based on how they are used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { | |||
assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0) | |||
} | |||
|
|||
private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row], timeTravelDfRows: Array[Row]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to consolidate this with compareUpdateRowsWithHudiRows
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
3192ada
to
4d43cbd
Compare
4d43cbd
to
efc1e23
Compare
Change Logs
Fix broken test as we changed hudi_table_change to spark streaming.
Impact
TestSparkSqlCoreFlow all green now
Risk level (write none, low medium or high below)
none
Documentation Update
none
Contributor's checklist