Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

<flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
<checkstyle.version>8.14</checkstyle.version>
<junit.version>4.13.2</junit.version>
<scala.binary.version>2.12</scala.binary.version>
Expand Down Expand Up @@ -177,6 +177,30 @@
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public DynamicTableSink createDynamicTableSink(Context context) {
Properties databendProperties =
getDatabendProperties(context.getCatalogTable().getOptions());
return new DatabendDynamicTableSink(
getDmlOptions(config),
getDmlOptions(config, databendProperties),
databendProperties,
getDmlOptions(config).getPrimaryKeys(),
getDmlOptions(config, databendProperties).getPrimaryKeys(),
catalogTable.getPartitionKeys().toArray(new String[0]),
context.getPhysicalRowDataType());
}
Expand All @@ -63,7 +63,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
Properties databendProperties =
getDatabendProperties(context.getCatalogTable().getOptions());
return new DatabendDynamicTableSource(
getReadOptions(config), databendProperties, context.getPhysicalRowDataType());
getReadOptions(config, databendProperties), databendProperties, context.getPhysicalRowDataType());
}

@Override
Expand Down Expand Up @@ -102,7 +102,7 @@ private void validateConfigOptions(ReadableConfig config) {
}
}

public DatabendDmlOptions getDmlOptions(ReadableConfig config) {
public DatabendDmlOptions getDmlOptions(ReadableConfig config, Properties databendProperties) {
return new DatabendDmlOptions.Builder()
.withUrl(config.get(URL))
.withUsername(config.get(USERNAME))
Expand All @@ -116,16 +116,18 @@ public DatabendDmlOptions getDmlOptions(ReadableConfig config) {
.withPrimaryKey(config.get(SINK_PRIMARY_KEYS).toArray(new String[0]))
.withIgnoreDelete(config.get(SINK_IGNORE_DELETE))
.withParallelism(config.get(SINK_PARALLELISM))
.withConnectionProperties(databendProperties)
.build();
}

private DatabendReadOptions getReadOptions(ReadableConfig config) {
private DatabendReadOptions getReadOptions(ReadableConfig config, Properties databendProperties) {
return new DatabendReadOptions.Builder()
.withUrl(config.get(URL))
.withUsername(config.get(USERNAME))
.withPassword(config.get(PASSWORD))
.withDatabaseName(config.get(DATABASE_NAME))
.withTableName(config.get(TABLE_NAME))
.withConnectionProperties(databendProperties)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.flink.connector.databend.catalog.databend;

public enum DataModel {
DUPLICATE,
UNIQUE,
AGGREGATE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package org.apache.flink.connector.databend.catalog.databend;

import org.apache.commons.compress.utils.Lists;
import org.apache.flink.annotation.Public;
import org.apache.flink.connector.databend.exception.DatabendRuntimeException;
import org.apache.flink.connector.databend.exception.DatabendSystemException;
import org.apache.flink.connector.databend.internal.connection.DatabendConnectionProvider;
import org.apache.flink.connector.databend.internal.options.DatabendConnectionOptions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Databend System Operate.
*/
@Public
public class DatabendSystem implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DatabendSystem.class);
private final DatabendConnectionProvider jdbcConnectionProvider;
private static final List<String> builtinDatabases =
Collections.singletonList("information_schema");

public DatabendSystem(DatabendConnectionOptions options) {
this.jdbcConnectionProvider = new DatabendConnectionProvider(options, options.getConnectionProperties());
}

public List<String> listDatabases() {
return extractColumnValuesBySQL(
"select schema_name from information_schema.schemata;;",
1,
dbName -> !builtinDatabases.contains(dbName));
}

public boolean databaseExists(String database) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(database));
return listDatabases().contains(database);
}

private static List<String> identifier(List<String> name) {
List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
return result;
}

public boolean createDatabase(String database) {
execute(String.format("CREATE DATABASE IF NOT EXISTS %s", database));
return true;
}

public void execute(String sql) {
try (Statement statement =
jdbcConnectionProvider.getOrCreateConnection().createStatement()) {
statement.execute(sql);
} catch (Exception e) {
throw new DatabendSystemException(
String.format("SQL query could not be executed: %s", sql), e);
}
}

public boolean tableExists(String database, String table) {
return databaseExists(database) && listTables(database).contains(table);
}

public List<String> listTables(String databaseName) {
if (!databaseExists(databaseName)) {
throw new DatabendRuntimeException("database" + databaseName + " is not exists");
}
return extractColumnValuesBySQL(
"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
1,
null,
databaseName);
}

public void createTable(TableSchema schema) {
String ddl = buildCreateTableDDL(schema);
LOG.info("Create table with ddl:{}", ddl);
execute(ddl);
}

public static String buildCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
.append(identifier(schema.getTable()))
.append("(");

Map<String, FieldSchema> fields = schema.getFields();

// append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
FieldSchema field = entry.getValue();
buildColumn(sb, field, false);
}
sb = sb.deleteCharAt(sb.length() - 1);
sb.append(" ) ");

// append table comment
if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' ");
}


return sb.toString();
}

private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
String fieldType = field.getTypeString();
if (isKey && DatabendType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DatabendType.VARCHAR, 65533);
}
sql.append(identifier(field.getName())).append(" ").append(fieldType);

if (field.getDefaultValue() != null) {
sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
}
sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',");
}

public static String quoteComment(String comment) {
if (comment == null) {
return "";
} else {
return comment.replaceAll("'", "\\\\'");
}
}

public static String quoteDefaultValue(String defaultValue) {
// DEFAULT current_timestamp not need quote
if (defaultValue.equalsIgnoreCase("current_timestamp")) {
return defaultValue;
}
return "'" + defaultValue + "'";
}


public List<String> extractColumnValuesBySQL(
String sql, int columnIndex, Predicate<String> filterFunc, Object... params) {

List<String> columnValues = Lists.newArrayList();
try (PreparedStatement ps =
jdbcConnectionProvider.getOrCreateConnection().prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
}
return columnValues;
} catch (Exception e) {
throw new DatabendSystemException(
String.format("The following SQL query could not be executed: %s", sql), e);
}
}

private static String identifier(String name) {
return "`" + name + "`";
}


private String quoteProperties(String name) {
return "'" + name + "'";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.apache.flink.connector.databend.catalog.databend;

public class DatabendType {
public static final String BOOLEAN = "BOOLEAN";
public static final String TINYINT = "TINYINT";
public static final String SMALLINT = "SMALLINT";
public static final String INT = "INT";
public static final String BIGINT = "BIGINT";
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String DECIMAL = "DECIMAL";
public static final String DATE = "DATE";
public static final String DATETIME = "DATETIME";
public static final String VARCHAR = "VARCHAR";
public static final String STRING = "STRING";
public static final String BITMAP = "BITMAP";
public static final String ARRAY = "ARRAY";
public static final String JSON = "JSON";
public static final String MAP = "MAP";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.apache.flink.connector.databend.catalog.databend;

public class FieldSchema {
private String name;
private String typeString;
private String comment;

private String defaultValue;

public FieldSchema() {
}

public FieldSchema(String name, String typeString, String comment, String defaultValue) {
this.name = name;
this.typeString = typeString;
this.defaultValue = defaultValue;
this.comment = comment;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getTypeString() {
return typeString;
}

public void setTypeString(String typeString) {
this.typeString = typeString;
}

public String getComment() {
return comment;
}

public void setComment(String comment) {
this.comment = comment;
}

public String getDefaultValue() {
return defaultValue;
}
}


Loading
Loading