Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8664] adapt TestSparkSqlCoreFlow for hudi stream API #12602

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Davis-Zhang-Onehouse
Copy link
Contributor

Change Logs

Fix broken test as we changed hudi_table_change to spark streaming.

Impact

TestSparkSqlCoreFlow all green now

Risk level (write none, low medium or high below)

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jan 8, 2025
@@ -76,14 +76,22 @@ public static List<String> listCommitsSince(HoodieStorage storage, String basePa

// this is used in the integration test script: docker/demo/sparksql-incremental.commands
public static List<String> listCompletionTimeSince(FileSystem fs, String basePath,
String instantTimestamp) {
String instantTimestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this method use listCompletedInstantSince (to return Stream<HoodieInstant>) to avoid code duplication on the same logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Returns the last successful write operation's completed instant.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem fs, String basePath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here on extracting the common functionality with latestCommit (two methods above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -30,16 +30,15 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils}

import org.apache.hudi.DataSourceReadOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: keep import grouping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val dataGen = new HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 0xDEED)

//Bulk insert first set of records
val inputDf0 = generateInserts(dataGen, "000", 100).cache()
val inputDf0 = generateInserts(dataGen, "000", 10).cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the record count the same as before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Returns the last successful write operation's completed instant.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem fs, String basePath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem fs, String basePath) {
public static HoodieInstant latestCompletedCommit(FileSystem fs, String basePath) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return timeline.lastInstant().get();
}

public static HoodieInstant latestCompletedCommitCompletionTime(HoodieStorage storage, String basePath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static HoodieInstant latestCompletedCommitCompletionTime(HoodieStorage storage, String basePath) {
public static HoodieInstant latestCompletedCommit(HoodieStorage storage, String basePath) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assertEquals(100, snapshotDf2.count())
compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
snapshotDf2.unpersist(true)
val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, tableBasePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, tableBasePath)
val commitInstant2 = latestCompletedCommitCompletionTime(fs, tableBasePath)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done for all

@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val dataGen = new HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 0xDEED)

//Bulk insert first set of records
val inputDf0 = generateInserts(dataGen, "000", 100).cache()
val inputDf0 = generateInserts(dataGen, "000", 10).cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for other places

val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 3)
val commitInstantTime3 = latestCommit(fs, tableBasePath)
val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, tableBasePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, tableBasePath)
val commitInstant3 = latestCompletedCommitCompletionTime(fs, tableBasePath)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// Read Incremental Query, uses hudi_table_changes() table valued function for spark sql
// we have 2 commits, try pulling the first commit (which is not the latest)
//HUDI-5266
val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
val firstCommitInstant = listCompletedInstantSince(fs, tableBasePath, "000").get(0)
val firstCommit = firstCommitInstant.getCompletionTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val firstCommit = firstCommitInstant.getCompletionTime
val firstCommitCompletionTime = firstCommitInstant.getCompletionTime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure all variables have consistent naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap

// Check that all input rows exist in hudiRows
inputRows.foreach { inputRow =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just sort the list by record key and do row comparison? Will that code be easier to understand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same but it is not the case:
we need to keep 3 idx for the 3 arrays, search in both inputRows and beforeRows for each row in hudiRows. Also need to handle various cases where the key cannot be found in total it leads to ~100 lines of code.

I can do that if required, the current one is the most concise one (but not the most efficient one since we are just handling couple of hundred rows)

val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from hudiTbl")
val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from inputTbl")
val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from beforeTbl")
def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], beforeRows: Array[Row]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this achieves almost the same functionality as compareUpdateRowsWithHudiRows? Could we keep one of them only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, it will requires refactoring other consumers as well

@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
}

private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row], timeTravelDfRows: Array[Row]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row], timeTravelDfRows: Array[Row]): Unit = {
private def compareEntireInputRowsWithHudiRows(expectedRows: Array[Row], actualRows: Array[Row]): Unit = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

def compareUpdateRowsWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], beforeRows: Array[Row]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def compareUpdateRowsWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], beforeRows: Array[Row]): Unit = {
def compareUpdateRowsWithHudiRows(expectedRows: Array[Row], actualUpdateRows: Array[Row], actualRows: Array[Row]): Unit = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you name them properly based on how they are used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
}

private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row], timeTravelDfRows: Array[Row]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to consolidate this with compareUpdateRowsWithHudiRows as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-8664 branch 3 times, most recently from 3192ada to 4d43cbd Compare January 9, 2025 02:09
@hudi-bot
Copy link

hudi-bot commented Jan 9, 2025

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants