Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] jdbc options #9046

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public void checkConnectorOptionExist() {

private Set<String> buildWhiteList() {
Set<String> whiteList = new HashSet<>();
whiteList.add("JdbcSinkOptions");
whiteList.add("TypesenseSourceOptions");
whiteList.add("TypesenseSinkOptions");
whiteList.add("PulsarSinkOptions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,70 +20,71 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;

public interface JdbcCatalogOptions {
Option<String> BASE_URL =
public class JdbcCatalogOptions {

public static final Option<String> BASE_URL =
Options.key("base-url")
.stringType()
.noDefaultValue()
.withDescription(
"URL has to be with database, like \"jdbc:mysql://localhost:5432/db\" or"
+ "\"jdbc:mysql://localhost:5432/db?useSSL=true\".");

Option<String> USERNAME =
public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"Name of the database to use when connecting to the database server.");

Option<String> PASSWORD =
public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("Password to use when connecting to the database server.");

Option<String> SCHEMA =
public static final Option<String> SCHEMA =
Options.key("schema")
.stringType()
.noDefaultValue()
.withDescription(
"for databases that support the schema parameter, give it priority.");

Option<String> COMPATIBLE_MODE =
public static final Option<String> COMPATIBLE_MODE =
Options.key("compatibleMode")
.stringType()
.noDefaultValue()
.withDescription(
"The compatible mode of database, required when the database supports multiple compatible modes. "
+ "For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'.");

OptionRule.Builder BASE_RULE =
OptionRule.builder()
.required(BASE_URL)
.required(USERNAME, PASSWORD)
.optional(SCHEMA, JdbcOptions.DECIMAL_TYPE_NARROWING);

Option<String> TABLE_PREFIX =
public static final Option<String> TABLE_PREFIX =
Options.key("tablePrefix")
.stringType()
.noDefaultValue()
.withDescription(
"The table prefix name added when the table is automatically created");

Option<String> TABLE_SUFFIX =
public static final Option<String> TABLE_SUFFIX =
Options.key("tableSuffix")
.stringType()
.noDefaultValue()
.withDescription(
"The table suffix name added when the table is automatically created");

Option<Boolean> CREATE_INDEX =
public static final Option<Boolean> CREATE_INDEX =
Options.key("create_index")
.booleanType()
.defaultValue(true)
.withDescription("Create index or not when auto create table");

Option<String> DRIVER = Options.key("driver").stringType().noDefaultValue();
public static final Option<String> DRIVER = Options.key("driver").stringType().noDefaultValue();

public static final OptionRule.Builder BASE_RULE =
OptionRule.builder()
.required(BASE_URL)
.required(USERNAME, PASSWORD)
.optional(SCHEMA, JdbcSourceOptions.DECIMAL_TYPE_NARROWING);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper;

Expand Down Expand Up @@ -89,7 +89,7 @@ public OracleCatalog(
pwd,
urlInfo,
defaultSchema,
JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue(),
JdbcSourceOptions.DECIMAL_TYPE_NARROWING.defaultValue(),
driverClass);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import com.google.auto.service.AutoService;
Expand Down Expand Up @@ -54,8 +54,8 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
options.get(JdbcCatalogOptions.PASSWORD),
urlInfo,
options.get(JdbcCatalogOptions.SCHEMA),
options.get(JdbcOptions.DECIMAL_TYPE_NARROWING),
options.get(JdbcOptions.DRIVER));
options.get(JdbcSourceOptions.DECIMAL_TYPE_NARROWING),
options.get(JdbcSourceOptions.DRIVER));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;

public class JdbcBaseOptions extends SinkConnectorCommonOptions {

public static final Option<String> URL =
Options.key("url").stringType().noDefaultValue().withDescription("url");

public static final Option<String> DRIVER =
Options.key("driver").stringType().noDefaultValue().withDescription("driver");

public static final Option<String> USER =
Options.key("user").stringType().noDefaultValue().withDescription("user");

public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("password");

public static final Option<String> QUERY =
Options.key("query").stringType().noDefaultValue().withDescription("query");

public static final Option<Integer> CONNECTION_CHECK_TIMEOUT_SEC =
Options.key("connection_check_timeout_sec")
.intType()
.defaultValue(30)
.withDescription("connection check time second");

public static final Option<String> COMPATIBLE_MODE =
Options.key("compatible_mode")
.stringType()
.noDefaultValue()
.withDescription(
"The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'.");

/*For Hive */
public static final Option<Boolean> USE_KERBEROS =
Options.key("use_kerberos")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable Kerberos, default is false.");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription(
"When use kerberos, we should set kerberos principal such as 'test_user@xxx'. ");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription(
"When use kerberos, we should set kerberos principal file path such as '/home/test/test_user.keytab'. ");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");
/*For Hive End */

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,59 +31,60 @@ public class JdbcConnectionConfig implements Serializable {
public String driverName;
public String compatibleMode;
public int connectionCheckTimeoutSeconds =
JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
JdbcBaseOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
public int maxRetries = JdbcSinkOptions.MAX_RETRIES.defaultValue();
public String username;
public String password;
public String query;

public boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
public boolean autoCommit = JdbcSinkOptions.AUTO_COMMIT.defaultValue();

public int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
public int batchSize = JdbcSinkOptions.BATCH_SIZE.defaultValue();

public String xaDataSourceClassName;

public boolean decimalTypeNarrowing = JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue();
public boolean decimalTypeNarrowing = JdbcSourceOptions.DECIMAL_TYPE_NARROWING.defaultValue();

public int maxCommitAttempts = JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
public int maxCommitAttempts = JdbcSinkOptions.MAX_COMMIT_ATTEMPTS.defaultValue();

public int transactionTimeoutSec = JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
public int transactionTimeoutSec = JdbcSinkOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();

public boolean useKerberos = JdbcOptions.USE_KERBEROS.defaultValue();
public boolean useKerberos = JdbcSourceOptions.USE_KERBEROS.defaultValue();

public String kerberosPrincipal;

public String kerberosKeytabPath;

public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
public String krb5Path = JdbcSourceOptions.KRB5_PATH.defaultValue();

private Map<String, String> properties;

public static JdbcConnectionConfig of(ReadonlyConfig config) {
JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder();
builder.url(config.get(JdbcOptions.URL));
builder.compatibleMode(config.get(JdbcOptions.COMPATIBLE_MODE));
builder.driverName(config.get(JdbcOptions.DRIVER));
builder.autoCommit(config.get(JdbcOptions.AUTO_COMMIT));
builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES));
builder.connectionCheckTimeoutSeconds(config.get(JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC));
builder.batchSize(config.get(JdbcOptions.BATCH_SIZE));
if (config.get(JdbcOptions.IS_EXACTLY_ONCE)) {
builder.xaDataSourceClassName(config.get(JdbcOptions.XA_DATA_SOURCE_CLASS_NAME));
builder.maxCommitAttempts(config.get(JdbcOptions.MAX_COMMIT_ATTEMPTS));
builder.transactionTimeoutSec(config.get(JdbcOptions.TRANSACTION_TIMEOUT_SEC));
builder.url(config.get(JdbcBaseOptions.URL));
builder.compatibleMode(config.get(JdbcBaseOptions.COMPATIBLE_MODE));
builder.driverName(config.get(JdbcBaseOptions.DRIVER));
builder.autoCommit(config.get(JdbcSinkOptions.AUTO_COMMIT));
builder.maxRetries(config.get(JdbcSinkOptions.MAX_RETRIES));
builder.connectionCheckTimeoutSeconds(
config.get(JdbcBaseOptions.CONNECTION_CHECK_TIMEOUT_SEC));
builder.batchSize(config.get(JdbcSinkOptions.BATCH_SIZE));
if (config.get(JdbcSinkOptions.IS_EXACTLY_ONCE)) {
builder.xaDataSourceClassName(config.get(JdbcSinkOptions.XA_DATA_SOURCE_CLASS_NAME));
builder.maxCommitAttempts(config.get(JdbcSinkOptions.MAX_COMMIT_ATTEMPTS));
builder.transactionTimeoutSec(config.get(JdbcSinkOptions.TRANSACTION_TIMEOUT_SEC));
builder.maxRetries(0);
}
if (config.get(JdbcOptions.USE_KERBEROS)) {
builder.useKerberos(config.get(JdbcOptions.USE_KERBEROS));
builder.kerberosPrincipal(config.get(JdbcOptions.KERBEROS_PRINCIPAL));
builder.kerberosKeytabPath(config.get(JdbcOptions.KERBEROS_KEYTAB_PATH));
builder.krb5Path(config.get(JdbcOptions.KRB5_PATH));
if (config.get(JdbcBaseOptions.USE_KERBEROS)) {
builder.useKerberos(config.get(JdbcBaseOptions.USE_KERBEROS));
builder.kerberosPrincipal(config.get(JdbcBaseOptions.KERBEROS_PRINCIPAL));
builder.kerberosKeytabPath(config.get(JdbcBaseOptions.KERBEROS_KEYTAB_PATH));
builder.krb5Path(config.get(JdbcBaseOptions.KRB5_PATH));
}
config.getOptional(JdbcOptions.USER).ifPresent(builder::username);
config.getOptional(JdbcOptions.PASSWORD).ifPresent(builder::password);
config.getOptional(JdbcOptions.PROPERTIES).ifPresent(builder::properties);
config.getOptional(JdbcOptions.DECIMAL_TYPE_NARROWING)
config.getOptional(JdbcBaseOptions.USER).ifPresent(builder::username);
config.getOptional(JdbcBaseOptions.PASSWORD).ifPresent(builder::password);
config.getOptional(JdbcSourceOptions.PROPERTIES).ifPresent(builder::properties);
config.getOptional(JdbcSourceOptions.DECIMAL_TYPE_NARROWING)
.ifPresent(builder::decimalTypeNarrowing);
return builder.build();
}
Expand Down Expand Up @@ -153,22 +154,23 @@ public static final class Builder {
private String driverName;
private String compatibleMode;
private int connectionCheckTimeoutSeconds =
JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
JdbcBaseOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
private int maxRetries = JdbcSinkOptions.MAX_RETRIES.defaultValue();
private String username;
private String password;
private String query;
private boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
private int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
private boolean autoCommit = JdbcSinkOptions.AUTO_COMMIT.defaultValue();
private int batchSize = JdbcSinkOptions.BATCH_SIZE.defaultValue();
private String xaDataSourceClassName;
private boolean decimalTypeNarrowing = JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue();
private int maxCommitAttempts = JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
private int transactionTimeoutSec = JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
private boolean decimalTypeNarrowing =
JdbcSourceOptions.DECIMAL_TYPE_NARROWING.defaultValue();
private int maxCommitAttempts = JdbcSinkOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
private int transactionTimeoutSec = JdbcSinkOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
private Map<String, String> properties;
public boolean useKerberos = JdbcOptions.USE_KERBEROS.defaultValue();
public boolean useKerberos = JdbcSourceOptions.USE_KERBEROS.defaultValue();
public String kerberosPrincipal;
public String kerberosKeytabPath;
public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
public String krb5Path = JdbcSourceOptions.KRB5_PATH.defaultValue();

private Builder() {}

Expand Down
Loading
Loading