Skip to content

Commit 7d6027e

Browse files
committed
[FLINK-38257][SQL] Add module dynamic function loading possibility
1 parent 0eeab73 commit 7d6027e

File tree

7 files changed

+256
-39
lines changed

7 files changed

+256
-39
lines changed

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,15 @@
2424
import org.apache.flink.state.api.runtime.SavepointLoader;
2525
import org.apache.flink.table.annotation.DataTypeHint;
2626
import org.apache.flink.table.annotation.FunctionHint;
27+
import org.apache.flink.table.api.DataTypes;
28+
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
2729
import org.apache.flink.table.functions.SpecializedFunction;
2830
import org.apache.flink.table.functions.TableFunction;
31+
import org.apache.flink.table.types.inference.TypeStrategies;
2932
import org.apache.flink.types.Row;
3033

34+
import static org.apache.flink.table.functions.FunctionKind.TABLE;
35+
3136
@Internal
3237
@FunctionHint(
3338
output =
@@ -41,6 +46,39 @@
4146
+ "operator-coordinator-state-size-in-bytes BIGINT NOT NULL, "
4247
+ "operator-total-size-in-bytes BIGINT NOT NULL>"))
4348
public class SavepointMetadataTableFunction extends TableFunction<Row> {
49+
50+
public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
51+
BuiltInFunctionDefinition.newBuilder()
52+
.name("savepoint_metadata")
53+
.kind(TABLE)
54+
.runtimeClass(SavepointMetadataTableFunction.class.getName())
55+
.outputTypeStrategy(
56+
TypeStrategies.explicit(
57+
DataTypes.ROW(
58+
DataTypes.FIELD(
59+
"checkpoint-id", DataTypes.BIGINT().notNull()),
60+
DataTypes.FIELD("operator-name", DataTypes.STRING()),
61+
DataTypes.FIELD("operator-uid", DataTypes.STRING()),
62+
DataTypes.FIELD(
63+
"operator-uid-hash",
64+
DataTypes.STRING().notNull()),
65+
DataTypes.FIELD(
66+
"operator-parallelism",
67+
DataTypes.INT().notNull()),
68+
DataTypes.FIELD(
69+
"operator-max-parallelism",
70+
DataTypes.INT().notNull()),
71+
DataTypes.FIELD(
72+
"operator-subtask-state-count",
73+
DataTypes.INT().notNull()),
74+
DataTypes.FIELD(
75+
"operator-coordinator-state-size-in-bytes",
76+
DataTypes.BIGINT().notNull()),
77+
DataTypes.FIELD(
78+
"operator-total-size-in-bytes",
79+
DataTypes.BIGINT().notNull()))))
80+
.build();
81+
4482
public SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) {}
4583

4684
public void eval(String savepointPath) {

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java

Lines changed: 15 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,70 +20,46 @@
2020

2121
import org.apache.flink.annotation.Experimental;
2222
import org.apache.flink.state.table.SavepointMetadataTableFunction;
23-
import org.apache.flink.table.api.DataTypes;
2423
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
24+
import org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
2525
import org.apache.flink.table.functions.FunctionDefinition;
2626
import org.apache.flink.table.module.Module;
27-
import org.apache.flink.table.types.inference.TypeStrategies;
2827

29-
import java.util.Collections;
28+
import java.util.ArrayList;
3029
import java.util.List;
3130
import java.util.Locale;
3231
import java.util.Map;
3332
import java.util.Optional;
33+
import java.util.ServiceLoader;
3434
import java.util.Set;
3535
import java.util.function.Function;
3636
import java.util.stream.Collectors;
3737

38-
import static org.apache.flink.table.functions.FunctionKind.TABLE;
39-
4038
/** Module of state in Flink. */
4139
@Experimental
4240
public class StateModule implements Module {
4341

4442
public static final String IDENTIFIER = "state";
4543

46-
public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
47-
BuiltInFunctionDefinition.newBuilder()
48-
.name("savepoint_metadata")
49-
.kind(TABLE)
50-
.runtimeClass(SavepointMetadataTableFunction.class.getName())
51-
.outputTypeStrategy(
52-
TypeStrategies.explicit(
53-
DataTypes.ROW(
54-
DataTypes.FIELD(
55-
"checkpoint-id", DataTypes.BIGINT().notNull()),
56-
DataTypes.FIELD("operator-name", DataTypes.STRING()),
57-
DataTypes.FIELD("operator-uid", DataTypes.STRING()),
58-
DataTypes.FIELD(
59-
"operator-uid-hash",
60-
DataTypes.STRING().notNull()),
61-
DataTypes.FIELD(
62-
"operator-parallelism",
63-
DataTypes.INT().notNull()),
64-
DataTypes.FIELD(
65-
"operator-max-parallelism",
66-
DataTypes.INT().notNull()),
67-
DataTypes.FIELD(
68-
"operator-subtask-state-count",
69-
DataTypes.INT().notNull()),
70-
DataTypes.FIELD(
71-
"operator-coordinator-state-size-in-bytes",
72-
DataTypes.BIGINT().notNull()),
73-
DataTypes.FIELD(
74-
"operator-total-size-in-bytes",
75-
DataTypes.BIGINT().notNull()))))
76-
.build();
77-
7844
public static final StateModule INSTANCE = new StateModule();
7945

8046
private final Map<String, BuiltInFunctionDefinition> normalizedFunctions;
8147
private final Set<String> functionNamesWithInternal;
8248
private final Set<String> functionNamesWithoutInternal;
8349

8450
private StateModule() {
85-
final List<BuiltInFunctionDefinition> definitions =
86-
Collections.singletonList(SAVEPOINT_METADATA);
51+
final List<BuiltInFunctionDefinition> definitions = new ArrayList<>();
52+
53+
definitions.add(SavepointMetadataTableFunction.SAVEPOINT_METADATA);
54+
ServiceLoader.load(DynamicBuiltInFunctionDefinitionFactory.class)
55+
.iterator()
56+
.forEachRemaining(
57+
f -> {
58+
if (f.factoryIdentifier().startsWith(IDENTIFIER)) {
59+
definitions.addAll(f.getBuiltInFunctionDefinitions());
60+
}
61+
});
62+
8763
this.normalizedFunctions =
8864
definitions.stream()
8965
.collect(
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table;
20+
21+
import org.apache.flink.api.common.RuntimeExecutionMode;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24+
import org.apache.flink.table.api.Table;
25+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
26+
import org.apache.flink.types.Row;
27+
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.Iterator;
31+
import java.util.List;
32+
33+
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
34+
import static org.assertj.core.api.Assertions.assertThat;
35+
36+
/** Unit tests for the savepoint SQL reader. */
37+
public class StateModuleTest {
38+
@Test
39+
public void testDynamicBuiltinFunction() throws Exception {
40+
Configuration config = new Configuration();
41+
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
42+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
43+
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
44+
45+
tEnv.executeSql("LOAD MODULE state");
46+
Table table = tEnv.sqlQuery("SELECT * FROM example_dynamic_table_function()");
47+
List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);
48+
49+
assertThat(result.size()).isEqualTo(1);
50+
Iterator<Row> it = result.iterator();
51+
assertThat(it.next().toString()).isEqualTo("+I[my-value]");
52+
}
53+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table.module;
20+
21+
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
22+
import org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
23+
24+
import java.util.List;
25+
26+
public class ExampleDynamicBuiltInFunctionDefinitionFactory
27+
implements DynamicBuiltInFunctionDefinitionFactory {
28+
@Override
29+
public String factoryIdentifier() {
30+
return StateModule.IDENTIFIER + ".example_function_factory";
31+
}
32+
33+
@Override
34+
public List<BuiltInFunctionDefinition> getBuiltInFunctionDefinitions() {
35+
return List.of(ExampleDynamicTableFunction.FUNCTION_DEFINITION);
36+
}
37+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table.module;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.annotation.DataTypeHint;
23+
import org.apache.flink.table.annotation.FunctionHint;
24+
import org.apache.flink.table.api.DataTypes;
25+
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
26+
import org.apache.flink.table.functions.SpecializedFunction;
27+
import org.apache.flink.table.functions.TableFunction;
28+
import org.apache.flink.table.types.inference.TypeStrategies;
29+
import org.apache.flink.types.Row;
30+
31+
import static org.apache.flink.table.functions.FunctionKind.TABLE;
32+
33+
@Internal
34+
@FunctionHint(output = @DataTypeHint("ROW<my-column STRING NOT NULL>"))
35+
public class ExampleDynamicTableFunction extends TableFunction<Row> {
36+
37+
public static final BuiltInFunctionDefinition FUNCTION_DEFINITION =
38+
BuiltInFunctionDefinition.newBuilder()
39+
.name("example_dynamic_table_function")
40+
.kind(TABLE)
41+
.runtimeClass(ExampleDynamicTableFunction.class.getName())
42+
.outputTypeStrategy(
43+
TypeStrategies.explicit(
44+
DataTypes.ROW(
45+
DataTypes.FIELD(
46+
"my-column", DataTypes.STRING().notNull()))))
47+
.build();
48+
49+
public ExampleDynamicTableFunction(SpecializedFunction.SpecializedContext context) {}
50+
51+
public void eval() {
52+
Row row = Row.withNames();
53+
row.setField("my-column", "my-value");
54+
collect(row);
55+
}
56+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
org.apache.flink.state.table.module.ExampleDynamicBuiltInFunctionDefinitionFactory
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.functions;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
import java.util.List;
24+
25+
/**
26+
* {@link BuiltInFunctionDefinition} factory for dynamic function registration. This could be useful
27+
* when functions wanted to be automatically loaded from SQL modules. A good example usage is that
28+
* state processor API SQL module is providing extra table functions from Kafka connector.
29+
*/
30+
@Experimental
31+
public interface DynamicBuiltInFunctionDefinitionFactory {
32+
/**
33+
* Returns the unique identifier of the factory. The suggested pattern is the following:
34+
* [module-name].[factory-name]. Such case modules can load all [module-name] prefixed functions
35+
* which belong to them.
36+
*/
37+
String factoryIdentifier();
38+
39+
/** Returns list of {@link BuiltInFunctionDefinition} which can be registered dynamically. */
40+
List<BuiltInFunctionDefinition> getBuiltInFunctionDefinitions();
41+
}

0 commit comments

Comments
 (0)