-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Config] Support nested references to external files in configuration files #8984
base: dev
Are you sure you want to change the base?
Changes from all commits
9d0a299
8294c05
ed29abf
8aefcf6
9c0fd04
73b4e0a
5ebd068
55972ae
580ecd7
e4b41f7
ccc1d22
6b1ad30
7ab6ec7
2973521
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -335,6 +335,91 @@ sink { | |
- For dynamic parameters, you can use the following format: `-i date=$(date +"%Y%m%d")`. | ||
- Cannot use specified system reserved characters; they will not be replaced by `-i`, such as: `${database_name}`, `${schema_name}`, `${table_name}`, `${schema_full_name}`, `${table_full_name}`, `${primary_key}`, `${unique_key}`, `${field_names}`. For details, please refer to [Sink Parameter Placeholders](sink-options-placeholders.md). | ||
|
||
|
||
## Configuration References | ||
|
||
In the configuration file, we can define some common configuration information and directly reference it in the Job configuration, thereby reusing the same configuration. This is commonly applied to the reuse of connection information and the independence of variable configuration files for development and production environments, greatly improving the maintainability of the configuration. | ||
|
||
### reference examples | ||
|
||
ref.conf | ||
```hocon | ||
# 定义 MySQL 连接配置 | ||
mysql_prod { | ||
url = "jdbc:mysql://192.168.1.19:3306/test" | ||
driver = "com.mysql.cj.jdbc.Driver" | ||
connection_check_timeout_sec = 100 | ||
user = "root" | ||
password = "111111" | ||
fetch_size = 10000 | ||
query="select * from not_exist" | ||
} | ||
|
||
# 定义 Kafka 连接配置 | ||
kafka_test { | ||
Comment on lines
+357
to
+359
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
bootstrap.servers = "kafka-test:9092" | ||
topic = "test_topic" | ||
} | ||
``` | ||
|
||
mysql_to_console_by_ref.conf | ||
```hocon | ||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
__st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf" | ||
} | ||
|
||
source { | ||
Jdbc { | ||
__st_config_ref_key__ = "mysql_prod" | ||
query="select * from department" | ||
} | ||
} | ||
|
||
transform { | ||
} | ||
|
||
sink { | ||
console { | ||
} | ||
} | ||
``` | ||
If a parameter is defined in both `plugin` configuration and the `ref` configuration, the priority is:`plugin > ref`。 | ||
|
||
Therefore, the `query` parameter in the plugin will be merged to `query="select * from department"`。 | ||
|
||
And then the final submitted configuration is: | ||
```hocon | ||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
__st_config_ref_path__ = "examples/ref.conf" | ||
} | ||
|
||
source { | ||
Jdbc { | ||
url = "jdbc:mysql://192.168.1.19:3306/test" | ||
driver = "com.mysql.cj.jdbc.Driver" | ||
connection_check_timeout_sec = 100 | ||
user = "root" | ||
password = "111111" | ||
fetch_size = 10000 | ||
query="select * from department" | ||
} | ||
} | ||
|
||
transform { | ||
} | ||
|
||
sink { | ||
console { | ||
} | ||
} | ||
``` | ||
|
||
|
||
|
||
## What's More | ||
|
||
- Start write your own config file now, choose the [connector](../connector-v2/source) you want to use, and configure the parameters according to the connector's documentation. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -322,7 +322,88 @@ sink { | |
- 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。 你可以使用环境变量传递带有空格的值。 | ||
- 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`。 | ||
- 不能使用指定系统保留字符,它将不会被`-i`替换,如:`${database_name}`、`${schema_name}`、`${table_name}`、`${schema_full_name}`、`${table_full_name}`、`${primary_key}`、`${unique_key}`、`${field_names}`。具体可参考[Sink参数占位符](sink-options-placeholders.md) | ||
|
||
## 配置引用 | ||
|
||
在配置文件中,我们可以定义一些通用的配置信息,在Job配置中直接引用,从而复用相同的配置。 | ||
常见应用于连接信息复用,以及开发和生产环境的变量配置文件独立,极大提高配置维护性。 | ||
|
||
具体样例: | ||
ref.conf | ||
```hocon | ||
# 定义 MySQL 连接配置 | ||
mysql_prod { | ||
url = "jdbc:mysql://192.168.1.19:3306/test" | ||
driver = "com.mysql.cj.jdbc.Driver" | ||
connection_check_timeout_sec = 100 | ||
user = "root" | ||
password = "111111" | ||
fetch_size = 10000 | ||
query="select * from not_exist" | ||
} | ||
|
||
# 定义 Kafka 连接配置 | ||
kafka_test { | ||
bootstrap.servers = "kafka-test:9092" | ||
topic = "test_topic" | ||
} | ||
``` | ||
|
||
mysql_to_console_by_ref.conf | ||
```hocon | ||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
__st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf" | ||
} | ||
|
||
source { | ||
Jdbc { | ||
__st_config_ref_key__ = "mysql_prod" | ||
query="select * from department" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the code, looks if the
Please add the priority for replacements when there are duplicate parameters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We can just describe it in the document to let use know this thing. Replace behavior is good to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 已经补充了配置优先级。 |
||
} | ||
} | ||
|
||
transform { | ||
} | ||
|
||
sink { | ||
console { | ||
} | ||
} | ||
``` | ||
如果在`plugin`配置和`ref`配置中均定义了同名的配置项,优先级为:`plugin配置 > ref配置`。 | ||
|
||
故在配置合并时,将使用plugin中的query配置`query="select * from department"`。 | ||
然后最终提交的配置是: | ||
```hocon | ||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
__st_config_ref_path__ = "examples/ref.conf" | ||
} | ||
|
||
source { | ||
Jdbc { | ||
url = "jdbc:mysql://192.168.1.19:3306/test" | ||
driver = "com.mysql.cj.jdbc.Driver" | ||
connection_check_timeout_sec = 100 | ||
user = "root" | ||
password = "111111" | ||
fetch_size = 10000 | ||
query="select * from department" | ||
} | ||
} | ||
|
||
transform { | ||
} | ||
|
||
sink { | ||
console { | ||
} | ||
} | ||
``` | ||
|
||
## 此外 | ||
|
||
如果你想了解更多关于格式配置的详细信息,请查看 [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)。 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,4 +107,10 @@ public class EnvCommonOptions { | |
.mapType() | ||
.noDefaultValue() | ||
.withDescription("Define the worker where the job runs by tag"); | ||
|
||
public static Option<String> REF_PATH = | ||
Options.key("__st_config_ref_path__") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not surprising, he cannot conflict with normal attributes |
||
.stringType() | ||
.defaultValue(null) | ||
.withDescription("Define the ref config file path"); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -173,29 +173,50 @@ private static Config processConfig( | |
String jsonString = config.root().render(ConfigRenderOptions.concise()); | ||
ObjectNode jsonNodes = JsonUtils.parseObject(jsonString); | ||
Map<String, Object> configMap = JsonUtils.toMap(jsonNodes); | ||
List<Map<String, Object>> sources = | ||
(ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE); | ||
List<Map<String, Object>> sinks = | ||
(ArrayList<Map<String, Object>>) configMap.get(Constants.SINK); | ||
Preconditions.checkArgument( | ||
!sources.isEmpty(), "Miss <Source> config! Please check the config file."); | ||
Preconditions.checkArgument( | ||
!sinks.isEmpty(), "Miss <Sink> config! Please check the config file."); | ||
sources.forEach( | ||
source -> { | ||
for (String sensitiveOption : sensitiveOptions) { | ||
source.computeIfPresent(sensitiveOption, processFunction); | ||
} | ||
}); | ||
sinks.forEach( | ||
sink -> { | ||
for (String sensitiveOption : sensitiveOptions) { | ||
sink.computeIfPresent(sensitiveOption, processFunction); | ||
} | ||
}); | ||
configMap.put(Constants.SOURCE, sources); | ||
configMap.put(Constants.SINK, sinks); | ||
return ConfigFactory.parseMap(configMap); | ||
if (configMap.containsKey(Constants.SOURCE)) { | ||
thirsd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
List<Map<String, Object>> sources = | ||
(ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE); | ||
List<Map<String, Object>> sinks = | ||
(ArrayList<Map<String, Object>>) configMap.get(Constants.SINK); | ||
Preconditions.checkArgument( | ||
!sources.isEmpty(), "Miss <Source> config! Please check the config file."); | ||
Preconditions.checkArgument( | ||
!sinks.isEmpty(), "Miss <Sink> config! Please check the config file."); | ||
sources.forEach( | ||
source -> { | ||
for (String sensitiveOption : sensitiveOptions) { | ||
source.computeIfPresent(sensitiveOption, processFunction); | ||
} | ||
}); | ||
sinks.forEach( | ||
sink -> { | ||
for (String sensitiveOption : sensitiveOptions) { | ||
sink.computeIfPresent(sensitiveOption, processFunction); | ||
} | ||
}); | ||
configMap.put(Constants.SOURCE, sources); | ||
configMap.put(Constants.SINK, sinks); | ||
return ConfigFactory.parseMap(configMap); | ||
} else { | ||
Map<String, Map<String, Object>> refMap = new HashMap<>(); | ||
// get map element in ref | ||
for (String key : configMap.keySet()) { | ||
Object ref_config = configMap.get(key); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should name the variables in the camelcase style. |
||
if (ref_config instanceof Map) { | ||
Map<String, Object> refDict = (Map<String, Object>) ref_config; | ||
refMap.put(key, refDict); | ||
} | ||
} | ||
refMap.forEach( | ||
(refId, RefConfig) -> { | ||
Map<String, Object> ref_dit = new HashMap<>(RefConfig.size()); | ||
for (String sensitiveOption : sensitiveOptions) { | ||
ref_dit.computeIfPresent(sensitiveOption, processFunction); | ||
} | ||
}); | ||
return ConfigFactory.parseMap(refMap); | ||
} | ||
} | ||
|
||
public static Set<String> getSensitiveOptions(Config config) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
__st_config_ref_path__ = "./common/ref.conf" | ||
} | ||
|
||
source { | ||
Jdbc { | ||
__st_config_ref_key__ = "mysql_prod" | ||
query="select * from department" | ||
} | ||
} | ||
|
||
transform { | ||
} | ||
|
||
sink { | ||
console { | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
# 定义 MySQL 连接配置 | ||
mysql_prod { | ||
url = "jdbc:mysql://192.168.1.19:3306/test" | ||
driver = "com.mysql.cj.jdbc.Driver" | ||
connection_check_timeout_sec = 100 | ||
user = "root" | ||
password = "111111" | ||
fetch_size = 10000 | ||
query="select * from not_exist" | ||
} | ||
|
||
# 定义 Kafka 连接配置 | ||
kafka_test { | ||
bootstrap.servers = "kafka-test:9092" | ||
topic = "test_topic" | ||
} | ||
|
||
sqlite_test { | ||
url = "jdbc:sqlite:D:/MyWorld/05Projects/java/Seatunnel/SeaTunnel/db/test.db" | ||
driver = "org.sqlite.JDBC" | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thirsd Please describe in English here, thanks.