Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Config] Support nested references to external files in configuration files #8984

Open
wants to merge 14 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions docs/en/concept/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,91 @@ sink {
- For dynamic parameters, you can use the following format: `-i date=$(date +"%Y%m%d")`.
- Cannot use specified system reserved characters; they will not be replaced by `-i`, such as: `${database_name}`, `${schema_name}`, `${table_name}`, `${schema_full_name}`, `${table_full_name}`, `${primary_key}`, `${unique_key}`, `${field_names}`. For details, please refer to [Sink Parameter Placeholders](sink-options-placeholders.md).


## Configuration References

In the configuration file, we can define some common configuration information and directly reference it in the Job configuration, thereby reusing the same configuration. This is commonly applied to the reuse of connection information and the independence of variable configuration files for development and production environments, greatly improving the maintainability of the configuration.

### reference examples

ref.conf
```hocon
# 定义 MySQL 连接配置
mysql_prod {
Comment on lines +346 to +348
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thirsd Please describe in English here, thanks.

url = "jdbc:mysql://192.168.1.19:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "111111"
fetch_size = 10000
query="select * from not_exist"
}

# 定义 Kafka 连接配置
kafka_test {
Comment on lines +357 to +359
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

bootstrap.servers = "kafka-test:9092"
topic = "test_topic"
}
```

mysql_to_console_by_ref.conf
```hocon
env {
parallelism = 1
job.mode = "BATCH"
__st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf"
}

source {
Jdbc {
__st_config_ref_key__ = "mysql_prod"
query="select * from department"
}
}

transform {
}

sink {
console {
}
}
```
If a parameter is defined in both `plugin` configuration and the `ref` configuration, the priority is:`plugin > ref`。

Therefore, the `query` parameter in the plugin will be merged to `query="select * from department"`。

And then the final submitted configuration is:
```hocon
env {
parallelism = 1
job.mode = "BATCH"
__st_config_ref_path__ = "examples/ref.conf"
}

source {
Jdbc {
url = "jdbc:mysql://192.168.1.19:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "111111"
fetch_size = 10000
query="select * from department"
}
}

transform {
}

sink {
console {
}
}
```



## What's More

- Start write your own config file now, choose the [connector](../connector-v2/source) you want to use, and configure the parameters according to the connector's documentation.
Expand Down
6 changes: 6 additions & 0 deletions docs/zh/concept/JobEnvConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
当值为`CLIENT`时,SaveMode操作在作业提交的过程中执行,使用shell脚本提交作业时,该过程在提交作业的shell进程中执行。使用rest api提交作业时,该过程在http请求的处理线程中执行。
请尽量使用`CLUSTER`模式,因为当`CLUSTER`模式没有问题时,我们将删除`CLIENT`模式。

### __st_config_ref_path__

此参数用于复用参数配置,通过Ref指定通用的配置文件。
当设置该参数后,就可以在job config中的source、transform、sink中使用`__st_config_ref_key__`指定复用的配置信息,并合并至Job配置信息。


## Flink 引擎参数

这里列出了一些与 Flink 中名称相对应的 SeaTunnel 参数名称,并非全部,更多内容请参考官方 [Flink Documentation](https://flink.apache.org/) for more.
Expand Down
83 changes: 82 additions & 1 deletion docs/zh/concept/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,88 @@ sink {
- 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。 你可以使用环境变量传递带有空格的值。
- 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`。
- 不能使用指定系统保留字符,它将不会被`-i`替换,如:`${database_name}`、`${schema_name}`、`${table_name}`、`${schema_full_name}`、`${table_full_name}`、`${primary_key}`、`${unique_key}`、`${field_names}`。具体可参考[Sink参数占位符](sink-options-placeholders.md)

## 配置引用

在配置文件中,我们可以定义一些通用的配置信息,在Job配置中直接引用,从而复用相同的配置。
常见应用于连接信息复用,以及开发和生产环境的变量配置文件独立,极大提高配置维护性。

具体样例:
ref.conf
```hocon
# 定义 MySQL 连接配置
mysql_prod {
url = "jdbc:mysql://192.168.1.19:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "111111"
fetch_size = 10000
query="select * from not_exist"
}

# 定义 Kafka 连接配置
kafka_test {
bootstrap.servers = "kafka-test:9092"
topic = "test_topic"
}
```

mysql_to_console_by_ref.conf
```hocon
env {
parallelism = 1
job.mode = "BATCH"
__st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf"
}

source {
Jdbc {
__st_config_ref_key__ = "mysql_prod"
query="select * from department"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, looks if the mysql_prod ref config also contains query parameter, it will overwrite the existed query parameter.

pluginConfig.withoutPath("__st_config_ref_key__").withFallback(refConfig);

Please add the priority for replacements when there are duplicate parameters.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

We can just describe it in the document to let use know this thing. Replace behavior is good to me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经补充了配置优先级。

}
}

transform {
}

sink {
console {
}
}
```
如果在`plugin`配置和`ref`配置中均定义了同名的配置项,优先级为:`plugin配置 > ref配置`。

故在配置合并时,将使用plugin中的query配置`query="select * from department"`。
然后最终提交的配置是:
```hocon
env {
parallelism = 1
job.mode = "BATCH"
__st_config_ref_path__ = "examples/ref.conf"
}

source {
Jdbc {
url = "jdbc:mysql://192.168.1.19:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "111111"
fetch_size = 10000
query="select * from department"
}
}

transform {
}

sink {
console {
}
}
```

## 此外

如果你想了解更多关于格式配置的详细信息,请查看 [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)。

Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,10 @@ public class EnvCommonOptions {
.mapType()
.noDefaultValue()
.withDescription("Define the worker where the job runs by tag");

public static Option<String> REF_PATH =
Options.key("__st_config_ref_path__")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__st_config_ref_path__ looks weird, how about naming it to config_ref_path?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not surprising, he cannot conflict with normal attributes

.stringType()
.defaultValue(null)
.withDescription("Define the ref config file path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,29 +173,50 @@ private static Config processConfig(
String jsonString = config.root().render(ConfigRenderOptions.concise());
ObjectNode jsonNodes = JsonUtils.parseObject(jsonString);
Map<String, Object> configMap = JsonUtils.toMap(jsonNodes);
List<Map<String, Object>> sources =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE);
List<Map<String, Object>> sinks =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SINK);
Preconditions.checkArgument(
!sources.isEmpty(), "Miss <Source> config! Please check the config file.");
Preconditions.checkArgument(
!sinks.isEmpty(), "Miss <Sink> config! Please check the config file.");
sources.forEach(
source -> {
for (String sensitiveOption : sensitiveOptions) {
source.computeIfPresent(sensitiveOption, processFunction);
}
});
sinks.forEach(
sink -> {
for (String sensitiveOption : sensitiveOptions) {
sink.computeIfPresent(sensitiveOption, processFunction);
}
});
configMap.put(Constants.SOURCE, sources);
configMap.put(Constants.SINK, sinks);
return ConfigFactory.parseMap(configMap);
if (configMap.containsKey(Constants.SOURCE)) {

List<Map<String, Object>> sources =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE);
List<Map<String, Object>> sinks =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SINK);
Preconditions.checkArgument(
!sources.isEmpty(), "Miss <Source> config! Please check the config file.");
Preconditions.checkArgument(
!sinks.isEmpty(), "Miss <Sink> config! Please check the config file.");
sources.forEach(
source -> {
for (String sensitiveOption : sensitiveOptions) {
source.computeIfPresent(sensitiveOption, processFunction);
}
});
sinks.forEach(
sink -> {
for (String sensitiveOption : sensitiveOptions) {
sink.computeIfPresent(sensitiveOption, processFunction);
}
});
configMap.put(Constants.SOURCE, sources);
configMap.put(Constants.SINK, sinks);
return ConfigFactory.parseMap(configMap);
} else {
Map<String, Map<String, Object>> refMap = new HashMap<>();
// get map element in ref
for (String key : configMap.keySet()) {
Object ref_config = configMap.get(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should name the variables in the camelcase style.

if (ref_config instanceof Map) {
Map<String, Object> refDict = (Map<String, Object>) ref_config;
refMap.put(key, refDict);
}
}
refMap.forEach(
(refId, RefConfig) -> {
Map<String, Object> ref_dit = new HashMap<>(RefConfig.size());
for (String sensitiveOption : sensitiveOptions) {
ref_dit.computeIfPresent(sensitiveOption, processFunction);
}
});
return ConfigFactory.parseMap(refMap);
}
}

public static Set<String> getSensitiveOptions(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,28 @@ public void testParseConfig() throws URISyntaxException {
config.getConfigList("source").get(0).getString("secret_key"), SECRET_KEY);
}

@Test
public void testParseRefConfig() throws URISyntaxException {
URL resource = ConfigShadeTest.class.getResource("/ref.conf");
Assertions.assertNotNull(resource);
Config refConfigs = ConfigBuilder.of(Paths.get(resource.toURI()));
// Assertions.assertEquals(config.);
Assertions.assertEquals(
refConfigs.getConfig("mysql_prod").getString("query"), "select * from not_exist");
Assertions.assertEquals(refConfigs.getConfig("mysql_prod").getString("password"), "111111");

URL jobConfigUrl = ConfigShadeTest.class.getResource("/mysql_to_console_by_ref.conf");
Config jobConfig = ConfigBuilder.of(Paths.get(jobConfigUrl.toURI()));

Config mysqlPluginConfig = jobConfig.getConfigList("source").get(0);
String refId = mysqlPluginConfig.getString("__st_config_ref_key__ ");
Config refConfig = refConfigs.getConfig(refId);
Config renderConfig =
mysqlPluginConfig.withoutPath("__st_config_ref_key__").withFallback(refConfig);
Assertions.assertEquals(renderConfig.getString("query"), "select * from department");
Assertions.assertEquals(renderConfig.getString("password"), "111111");
}

@Test
public void testUsePrivacyHandlerHocon() throws URISyntaxException {
URL resource = ConfigShadeTest.class.getResource("/config.shade.conf");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
__st_config_ref_path__ = "./common/ref.conf"
}

source {
Jdbc {
__st_config_ref_key__ = "mysql_prod"
query="select * from department"
}
}

transform {
}

sink {
console {
}
}
39 changes: 39 additions & 0 deletions seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# 定义 MySQL 连接配置
mysql_prod {
url = "jdbc:mysql://192.168.1.19:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "111111"
fetch_size = 10000
query="select * from not_exist"
}

# 定义 Kafka 连接配置
kafka_test {
bootstrap.servers = "kafka-test:9092"
topic = "test_topic"
}

sqlite_test {
url = "jdbc:sqlite:D:/MyWorld/05Projects/java/Seatunnel/SeaTunnel/db/test.db"
driver = "org.sqlite.JDBC"
}

Loading
Loading