Skip to content

Commit 84b1f93

Browse files
committed
Fix non-nullable datetime when zeroDateTimeBehavior is CONVERT_TO_NULL.
1 parent 2b47611 commit 84b1f93

File tree

9 files changed

+159
-3
lines changed

9 files changed

+159
-3
lines changed

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.cloudsql.mysql;
1818

19+
import com.google.common.collect.Maps;
1920
import io.cdap.cdap.api.annotation.Category;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Name;
@@ -75,7 +76,7 @@ public StructuredRecord transform(LongWritable longWritable, MysqlDBRecord mysql
7576

7677
@Override
7778
protected SchemaReader getSchemaReader(String sessionID) {
78-
return new MysqlSchemaReader(sessionID);
79+
return new MysqlSchemaReader(sessionID, Maps.fromProperties(config.getConnectionArgumentsProperties()));
7980
}
8081

8182
@Override

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java

+7
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
import io.cdap.plugin.common.Asset;
3232
import io.cdap.plugin.common.ConfigUtil;
3333
import io.cdap.plugin.common.LineageRecorder;
34+
import io.cdap.plugin.db.SchemaReader;
3435
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
3536
import io.cdap.plugin.db.source.AbstractDBSource;
3637
import io.cdap.plugin.mysql.MysqlDBRecord;
38+
import io.cdap.plugin.mysql.MysqlSchemaReader;
3739
import io.cdap.plugin.util.CloudSQLUtil;
3840
import io.cdap.plugin.util.DBUtils;
3941
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
@@ -120,6 +122,11 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
120122
return new LineageRecorder(context, assetBuilder.build());
121123
}
122124

125+
@Override
126+
protected SchemaReader getSchemaReader() {
127+
return new MysqlSchemaReader(null, cloudsqlMysqlSourceConfig.getConnectionArguments());
128+
}
129+
123130
/** CloudSQL MySQL source config. */
124131
public static class CloudSQLMySQLSourceConfig extends AbstractDBSpecificSourceConfig {
125132

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.mysql;
1818

19+
import com.google.common.collect.Maps;
1920
import io.cdap.cdap.api.annotation.Category;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Name;
@@ -62,7 +63,7 @@ public boolean supportSchema() {
6263

6364
@Override
6465
protected SchemaReader getSchemaReader(String sessionID) {
65-
return new MysqlSchemaReader(sessionID);
66+
return new MysqlSchemaReader(sessionID, Maps.fromProperties(config.getConnectionArgumentsProperties()));
6667
}
6768

6869
@Override

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConstants.java

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private MysqlConstants() {
3939
public static final String TRUST_CERT_KEYSTORE_PASSWORD = "trustCertificateKeyStorePassword";
4040
public static final String MYSQL_CONNECTION_STRING_FORMAT = "jdbc:mysql://%s:%s/%s";
4141
public static final String USE_CURSOR_FETCH = "useCursorFetch";
42+
public static final String ZERO_DATE_TIME_BEHAVIOR = "zeroDateTimeBehavior";
4243

4344
/**
4445
* Query to set SQL_MODE system variable.

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSchemaReader.java

+34
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
package io.cdap.plugin.mysql;
1818

19+
import com.google.common.collect.Lists;
1920
import io.cdap.cdap.api.data.schema.Schema;
2021
import io.cdap.plugin.db.CommonSchemaReader;
2122

23+
import java.sql.ResultSet;
2224
import java.sql.ResultSetMetaData;
2325
import java.sql.SQLException;
2426
import java.sql.Types;
27+
import java.util.List;
28+
import java.util.Map;
2529

2630
/**
2731
* Schema reader for mapping Mysql DB type
@@ -31,12 +35,42 @@ public class MysqlSchemaReader extends CommonSchemaReader {
3135
public static final String YEAR_TYPE_NAME = "YEAR";
3236
public static final String MEDIUMINT_UNSIGNED_TYPE_NAME = "MEDIUMINT UNSIGNED";
3337
private final String sessionID;
38+
private boolean zeroDateTimeToNull;
3439

3540
public MysqlSchemaReader(String sessionID) {
3641
super();
3742
this.sessionID = sessionID;
3843
}
3944

45+
public MysqlSchemaReader(String sessionID, Map<String, String> connectionArguments) {
46+
super();
47+
this.sessionID = sessionID;
48+
this.zeroDateTimeToNull = MysqlUtil.isZeroDateTimeToNull(connectionArguments);
49+
}
50+
51+
@Override
52+
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
53+
List<Schema.Field> schemaFields = Lists.newArrayList();
54+
ResultSetMetaData metadata = resultSet.getMetaData();
55+
// ResultSetMetadata columns are numbered starting with 1
56+
for (int i = 1; i <= metadata.getColumnCount(); i++) {
57+
if (shouldIgnoreColumn(metadata, i)) {
58+
continue;
59+
}
60+
61+
String columnName = metadata.getColumnName(i);
62+
Schema columnSchema = getSchema(metadata, i);
63+
64+
if (ResultSetMetaData.columnNullable == metadata.isNullable(i)
65+
|| (zeroDateTimeToNull && MysqlUtil.isDateTimeLikeType(metadata.getColumnType(i)))) {
66+
columnSchema = Schema.nullableOf(columnSchema);
67+
}
68+
Schema.Field field = Schema.Field.of(columnName, columnSchema);
69+
schemaFields.add(field);
70+
}
71+
return schemaFields;
72+
}
73+
4074
@Override
4175
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
4276
return metadata.getColumnName(index).equals("c_" + sessionID) ||

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
8181

8282
@Override
8383
protected SchemaReader getSchemaReader() {
84-
return new MysqlSchemaReader(null);
84+
return new MysqlSchemaReader(null, mysqlSourceConfig.getConnectionArguments());
8585
}
8686

8787
/**

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlUtil.java

+17
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.common.collect.ImmutableMap;
2020

21+
import java.sql.Types;
2122
import java.util.Map;
2223

2324
/**
@@ -91,4 +92,20 @@ public static Map<String, String> composeDbSpecificArgumentsMap(Boolean autoReco
9192
public static String getConnectionString(String host, Integer port, String database) {
9293
return String.format(MysqlConstants.MYSQL_CONNECTION_STRING_FORMAT, host, port, database);
9394
}
95+
96+
public static boolean isDateTimeLikeType(int columnType) {
97+
int[] dateTimeLikeTypes = new int[]{Types.TIMESTAMP, Types.TIMESTAMP_WITH_TIMEZONE, Types.DATE};
98+
99+
for (int dttType : dateTimeLikeTypes) {
100+
if (dttType == columnType) {
101+
return true;
102+
}
103+
}
104+
return false;
105+
}
106+
107+
public static boolean isZeroDateTimeToNull(Map<String, String> connectionArguments) {
108+
String argValue = connectionArguments.getOrDefault(MysqlConstants.ZERO_DATE_TIME_BEHAVIOR, "");
109+
return argValue.equals("CONVERT_TO_NULL") || argValue.equals("convertToNull");
110+
}
94111
}

mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSchemaReaderUnitTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
import org.junit.Test;
2222
import org.mockito.Mockito;
2323

24+
import java.sql.ResultSet;
2425
import java.sql.ResultSetMetaData;
2526
import java.sql.SQLException;
2627
import java.sql.Types;
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
2731

2832
public class MysqlSchemaReaderUnitTest {
2933

@@ -37,4 +41,33 @@ public void validateYearTypeToStringTypeConversion() throws SQLException {
3741
Schema schema = schemaReader.getSchema(metadata, 1);
3842
Assert.assertTrue(Schema.of(Schema.Type.INT).equals(schema));
3943
}
44+
45+
@Test
46+
public void validateZeroDateTimeBehavior() throws SQLException {
47+
ResultSet resultSet = Mockito.mock(ResultSet.class);
48+
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
49+
Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
50+
51+
Mockito.when(metadata.getColumnCount()).thenReturn(1);
52+
Mockito.when(metadata.getColumnName(Mockito.eq(1))).thenReturn("some_date");
53+
54+
Mockito.when(metadata.getColumnType(Mockito.eq(1))).thenReturn(Types.DATE);
55+
Mockito.when(metadata.getColumnTypeName(Mockito.eq(1))).thenReturn(MysqlSchemaReader.YEAR_TYPE_NAME);
56+
57+
// non-nullable column
58+
Mockito.when(metadata.isNullable(Mockito.eq(1))).thenReturn(0);
59+
60+
// test that non-nullable date remains non-nullable when no conn arg is present
61+
MysqlSchemaReader schemaReader = new MysqlSchemaReader(null);
62+
List<Schema.Field> schemaFields = schemaReader.getSchemaFields(resultSet);
63+
Assert.assertFalse(schemaFields.get(0).getSchema().isNullable());
64+
65+
// test that it converts non-nullable date column to nullable when zeroDateTimeBehavior is convert to null
66+
Map<String, String> connectionArguments = new HashMap<>();
67+
connectionArguments.put("zeroDateTimeBehavior", "CONVERT_TO_NULL");
68+
69+
schemaReader = new MysqlSchemaReader(null, connectionArguments);
70+
schemaFields = schemaReader.getSchemaFields(resultSet);
71+
Assert.assertTrue(schemaFields.get(0).getSchema().isNullable());
72+
}
4073
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
2+
/*
3+
* Copyright © 2024 Cask Data, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
6+
* use this file except in compliance with the License. You may obtain a copy of
7+
* the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
18+
package io.cdap.plugin.mysql;
19+
20+
import org.junit.Test;
21+
22+
import java.sql.Types;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import static org.junit.Assert.assertFalse;
27+
import static org.junit.Assert.assertTrue;
28+
29+
public class MysqlUtilUnitTest {
30+
31+
@Test
32+
public void testIsZeroDateTimeToNull() {
33+
Map<String, String> connArgsMap = new HashMap<>(1);
34+
35+
connArgsMap.put("zeroDateTimeBehavior", "");
36+
assertFalse(MysqlUtil.isZeroDateTimeToNull(connArgsMap));
37+
38+
connArgsMap.put("zeroDateTimeBehavior", "ROUND");
39+
assertFalse(MysqlUtil.isZeroDateTimeToNull(connArgsMap));
40+
41+
connArgsMap.put("zeroDateTimeBehavior", "CONVERT_TO_NULL");
42+
assertTrue(MysqlUtil.isZeroDateTimeToNull(connArgsMap));
43+
44+
connArgsMap.put("zeroDateTimeBehavior", "convertToNull");
45+
assertTrue(MysqlUtil.isZeroDateTimeToNull(connArgsMap));
46+
}
47+
48+
@Test
49+
public void testIsDateTimeLikeType() {
50+
int dateType = Types.DATE;
51+
int timestampType = Types.TIMESTAMP;
52+
int timestampWithTimezoneType = Types.TIMESTAMP_WITH_TIMEZONE;
53+
int timeType = Types.TIME;
54+
int stringType = Types.VARCHAR;
55+
56+
assertTrue(MysqlUtil.isDateTimeLikeType(dateType));
57+
assertTrue(MysqlUtil.isDateTimeLikeType(timestampType));
58+
assertTrue(MysqlUtil.isDateTimeLikeType(timestampWithTimezoneType));
59+
assertFalse(MysqlUtil.isDateTimeLikeType(timeType));
60+
assertFalse(MysqlUtil.isDateTimeLikeType(stringType));
61+
}
62+
}

0 commit comments

Comments
 (0)