Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8817] fix backward compatibility issue #12615

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieIndexingConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
Expand All @@ -35,6 +34,7 @@
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.HoodieWriteBaseConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
Expand Down Expand Up @@ -120,7 +120,7 @@
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations that control write behavior on Hudi tables. These can be directly passed down from even "
+ "higher level frameworks (e.g Spark datasources, Flink sink) and utilities (e.g Hudi Streamer).")
public class HoodieWriteConfig extends HoodieConfig {
public class HoodieWriteConfig extends HoodieWriteBaseConfig {

private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteConfig.class);
private static final long serialVersionUID = 0L;
Expand All @@ -138,16 +138,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");

public static final ConfigProperty<Integer> WRITE_TABLE_VERSION = ConfigProperty
.key("hoodie.write.table.version")
.defaultValue(HoodieTableVersion.current().versionCode())
.withValidValues(
String.valueOf(HoodieTableVersion.SIX.versionCode()),
String.valueOf(HoodieTableVersion.current().versionCode())
)
.sinceVersion("1.0.0")
.withDocumentation("The table version this writer is storing the table in. This should match the current table version.");

public static final ConfigProperty<Boolean> AUTO_UPGRADE_VERSION = ConfigProperty
.key("hoodie.write.auto.upgrade")
.defaultValue(true)
Expand Down Expand Up @@ -232,17 +222,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated "
+ "during the writes. Turned on by default.");

public static final ConfigProperty<String> TIMELINE_LAYOUT_VERSION_NUM = ConfigProperty
.key("hoodie.timeline.layout.version")
.defaultValue(Integer.toString(TimelineLayoutVersion.CURR_VERSION))
.withValidValues(Integer.toString(TimelineLayoutVersion.VERSION_0),
Integer.toString(TimelineLayoutVersion.VERSION_1),
Integer.toString(TimelineLayoutVersion.VERSION_2))
.markAdvanced()
.sinceVersion("0.5.1")
.withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models "
+ "the timeline as an immutable log relying only on atomic writes for object storage.");

public static final ConfigProperty<HoodieFileFormat> BASE_FILE_FORMAT = ConfigProperty
.key("hoodie.base.file.format")
.defaultValue(HoodieFileFormat.PARQUET)
Expand Down Expand Up @@ -1283,14 +1262,6 @@ public String getWriteSchema() {
return getSchema();
}

public HoodieTableVersion getWriteVersion() {
Integer versionCode = getInt(WRITE_TABLE_VERSION);
if (versionCode != null) {
WRITE_TABLE_VERSION.checkValues(versionCode.toString());
}
return HoodieTableVersion.fromVersionCode(getIntOrDefault(WRITE_TABLE_VERSION));
}

public boolean autoUpgrade() {
return getBoolean(AUTO_UPGRADE_VERSION);
}
Expand Down Expand Up @@ -1378,10 +1349,6 @@ public boolean shouldUseExternalSchemaTransformation() {
return getBoolean(AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE);
}

public Integer getTimelineLayoutVersion() {
return getInt(TIMELINE_LAYOUT_VERSION_NUM);
}

public int getBulkInsertShuffleParallelism() {
return getInt(BULKINSERT_PARALLELISM_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.config;

import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;

import javax.annotation.concurrent.Immutable;

import java.util.Properties;

@Immutable
public class HoodieWriteBaseConfig extends HoodieConfig {
protected HoodieWriteBaseConfig(Properties props) {
super(props);
}

protected HoodieWriteBaseConfig() {
super();
}

public static final ConfigProperty<Integer> WRITE_TABLE_VERSION = ConfigProperty
.key("hoodie.write.table.version")
.defaultValue(HoodieTableVersion.current().versionCode())
.withValidValues(
String.valueOf(HoodieTableVersion.SIX.versionCode()),
String.valueOf(HoodieTableVersion.current().versionCode())
)
.sinceVersion("1.0.0")
.withDocumentation("The table version this writer is storing the table in. This should match the current table version.");

public static final ConfigProperty<String> TIMELINE_LAYOUT_VERSION_NUM = ConfigProperty
.key("hoodie.timeline.layout.version")
.defaultValue(Integer.toString(TimelineLayoutVersion.CURR_VERSION))
.withValidValues(Integer.toString(TimelineLayoutVersion.VERSION_0),
Integer.toString(TimelineLayoutVersion.VERSION_1),
Integer.toString(TimelineLayoutVersion.VERSION_2))
.markAdvanced()
.sinceVersion("0.5.1")
.withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models "
+ "the timeline as an immutable log relying only on atomic writes for object storage.");

public HoodieTableVersion getWriteVersion() {
Integer versionCode = getInt(WRITE_TABLE_VERSION);
if (versionCode != null) {
WRITE_TABLE_VERSION.checkValues(versionCode.toString());
}
return HoodieTableVersion.fromVersionCode(getIntOrDefault(WRITE_TABLE_VERSION));
}

public Integer getTimelineLayoutVersion() {
return getInt(TIMELINE_LAYOUT_VERSION_NUM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.config.HoodieWriteBaseConfig.WRITE_TABLE_VERSION;
import static org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
Expand Down Expand Up @@ -1271,10 +1272,24 @@ public TableBuilder fromProperties(Properties properties) {
setTableName(hoodieConfig.getString(HoodieTableConfig.NAME));
}

if (hoodieConfig.contains(VERSION)) {
if (hoodieConfig.contains(VERSION) && hoodieConfig.contains(WRITE_TABLE_VERSION)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in meta client should not consider write table version. Otherwise, there is no point checking whether hoodie.write.table.version and hoodie.table.version match.

CommonClientUtils#validateTableVersion has the validation. Does Spark SQL writer escape that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, SQL writer does not call this function at all for create table stmt

LOG.warn("Detected both {} and {} are configured, {} would be adopted and {} is ignored.",
VERSION.key(), WRITE_TABLE_VERSION.key(), VERSION.key(), WRITE_TABLE_VERSION.key());
setTableVersion(hoodieConfig.getInt(VERSION));
} else {
if (hoodieConfig.contains(VERSION)) {
setTableVersion(hoodieConfig.getInt(VERSION));
}
if (hoodieConfig.contains(WRITE_TABLE_VERSION)) {
LOG.warn("Detected writer config {} to be used for setting table config {}. It is suggested to"
+ " always explicitly set {} instead.",
WRITE_TABLE_VERSION.key(), VERSION.key(), VERSION.key());
setTableVersion(hoodieConfig.getInt(WRITE_TABLE_VERSION));
}
}



if (hoodieConfig.contains(TIMELINE_PATH)) {
setTimelinePath(hoodieConfig.getString(TIMELINE_PATH));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPER
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.HoodieSchemaNotFoundException
import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.config.HoodieWriteBaseConfig.WRITE_TABLE_VERSION
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.model.WriteConcurrencyMode
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion, TableSchemaResolver}
import org.apache.hudi.common.table.log.InstantRange.RangeType
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ConfigUtils, TablePathUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, WRITE_TABLE_VERSION}
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.io.storage.HoodieSparkIOFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION}
import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
Expand Down Expand Up @@ -750,7 +750,7 @@ class HoodieSparkSqlWriterInternal {
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
.setRecordKeyFields(recordKeyFields)
.setTableVersion(hoodieConfig.getIntOrDefault(WRITE_TABLE_VERSION))
.setTableVersion(hoodieConfig.getIntOrDefault(HoodieWriteBaseConfig.WRITE_TABLE_VERSION))
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(payloadClass)
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.functional

import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals}
import org.apache.hudi.common.config.HoodieWriteBaseConfig.WRITE_TABLE_VERSION
import org.apache.hudi.common.table.HoodieTableConfig.VERSION
import org.apache.hudi.hadoop.fs.HadoopFSUtils

class TestDefaultEightWriteSix extends HoodieSparkSqlTestBase {

test("Test default version eight write version six") {
withSparkSqlSessionConfig(VERSION.key() -> "8",
WRITE_TABLE_VERSION.key() -> "8") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = tmp.getCanonicalPath
val writeVersionCode = 6
spark.sql(
s"""
| create table $tableName (
| id int,
| ts long,
| dt string
| ) using hudi
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.write.table.version = $writeVersionCode
| )
| partitioned by(dt)
| location '$basePath'
""".stripMargin)
val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), writeVersionCode)
}
}
}

test("Test conflicting version config") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = tmp.getCanonicalPath
val tableVersionCode = 6
val writeVersionCode = 8
spark.sql(
s"""
| create table $tableName (
| id int,
| ts long,
| dt string
| ) using hudi
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.table.version = $tableVersionCode,
| hoodie.write.table.version = $writeVersionCode
| )
| partitioned by(dt)
| location '$basePath'
""".stripMargin)
val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), tableVersionCode)
}
}

test("Test default version eight write version six conflicting config") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = tmp.getCanonicalPath
val tableVersionCode = 6
assertNotEquals(VERSION.defaultValue().versionCode(), tableVersionCode)
spark.sql(
s"""
| create table $tableName (
| id int,
| ts long,
| dt string
| ) using hudi
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.table.version = "$tableVersionCode"
| )
| partitioned by(dt)
| location '$basePath'
""".stripMargin)
val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), tableVersionCode)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.hudi.functional

import org.apache.hudi.DataSourceReadOptions.{START_OFFSET, STREAMING_READ_TABLE_VERSION}
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.common.config.HoodieWriteBaseConfig.WRITE_TABLE_VERSION
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.HoodieCompactionConfig
import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE, INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE, WRITE_TABLE_VERSION}
import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE, INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE}

import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.util.JavaConversions
import org.apache.spark.sql.streaming.StreamTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,19 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
}
}

protected def withSparkSqlSessionConfig(configNameValues: (String, String)*)(f: => Unit): Unit = {
try {
configNameValues.foreach { case (configName, configValue) =>
spark.sql(s"set $configName=$configValue")
}
f
} finally {
configNameValues.foreach { case (configName, _) =>
spark.sql(s"reset $configName")
}
}
}

protected def withTable(tableName: String)(f: String => Unit): Unit = {
try {
f(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

import java.io.Serializable;

import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
import static org.apache.hudi.common.config.HoodieWriteBaseConfig.WRITE_TABLE_VERSION;

/**
* Represents a source from which we can tail data. Assumes a constructor that takes properties.
Expand Down
Loading
Loading