-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38257][SQL] Add module dynamic function loading possibility #26933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR cleanly introduces a minimal SPI and wires the state module to load connector-owned TableFunctions
at runtime via ServiceLoader
, moving the built-in savepoint_metadata
into its own TF class and proving the mechanism with an example + test. It achieves the stated goal with a small, well-scoped change and keeps Flink core free of connector dependencies. I wrote only some nits and questions.
@@ -41,6 +46,39 @@ | |||
+ "operator-coordinator-state-size-in-bytes BIGINT NOT NULL, " | |||
+ "operator-total-size-in-bytes BIGINT NOT NULL>")) | |||
public class SavepointMetadataTableFunction extends TableFunction<Row> { | |||
|
|||
public static final BuiltInFunctionDefinition SAVEPOINT_METADATA = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice to keep the definition near the code. do you think we should also mark that the function should not be pre-calculated by the optimizer? i mean to make it clear that flink must always run it at query time, not try to be clever and “pre-compute” it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is not yet clear🤷🏻♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe i misunderstood what happens here with savepoint_metadata
, but flink's planner can constant-fold and cache function results at plan time if it believes a function is deterministic. that is correct for pure math/string functions, but could be problematic for something like this savepoint_metadata
, which read external, changing state (fs/checkpoint files). and between planning and exec, or between two exec, the metadata could change. so i just thought whether we should tell the planner this function is non-deterministic, so it will not pre-evaluate or cache it, and will always run it at exec time. idk whether the BuiltInFunctionDefinition.newBuilder()
can supports to call .deterministic(false)
, or use and override the UDF isDeterministic()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair point. I'm not fan of fixing something which is unrelated but this is so tiny that adding notDeterministic()
is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though it worth the discussion separate whether the planner caches it with input params or not. I would guess yes, otherwise it just doesn't make sense from usage point of view. If the params are stored together with the result then adding notDeterministic()
doesn't hurt but also not helping.
...link-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java
Outdated
Show resolved
Hide resolved
...link-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java
Show resolved
Hide resolved
"operator-total-size-in-bytes", | ||
DataTypes.BIGINT().notNull())))) | ||
.build(); | ||
|
||
public static final StateModule INSTANCE = new StateModule(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is not part of this pr, but it seems like the constructor does classpath scanning, and that is fine if the module is loaded once, but worth a note: if someone repeatedly constructs modules in tests, scanning happens each time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think testing the module as it's going to behave in PROD is something which is desired even if it requires some time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, totally fair
my (out-of-scope) nit was only about to maybe add a short javadoc on StateModule
noting that the constructor performs ServiceLoader
discovery, so repeated instantiation re-scans the classpath, and maybe in some tests, future authors can prefer StateModule.INSTANCE
over new StateModule()
to avoid unintentional repeated scans
@Test | ||
public void testDynamicBuiltinFunction() throws Exception { | ||
Configuration config = new Configuration(); | ||
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think we should add a test with streaming mode too and add a negative test for name collisions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name collision test has been added but streaming would add no value here.
|
||
import java.util.List; | ||
|
||
public class ExampleDynamicBuiltInFunctionDefinitionFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
api looks minimal and clear.
nit: maybe we can standardize to . and document that function names returned must be unique within the module (and what happens otherwise)
nit: maybe we can add an optional ModuleContext
param (classloader, config). not needed now, but helps if a provider later needs context without breaking the spi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is now fully experimental and later subject to change. If the time comes and ModuleContext
or something else is required then we're going to add it. Finding out later needs is just not working in general 🙂
Related the names there is already a builtin function name suggestion which already given. Of course this is not documented because it's not public API and I would stick to that since it exists from ages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, thanks for clarifying
on the “built-in function name suggestion”: understood, it is internal and long-standing, so i am fine relying on it. if feasible, a short internal comment pointing to where that suggestion lives would help new contributors
bottom line: +1 to keep the api minimal as proposed
7d6027e
to
2fe5c62
Compare
2fe5c62
to
6030634
Compare
6030634
to
032dd0e
Compare
What is the purpose of the change
Modules like the state processor API could add SQL functions which are coming from external connectors like Kafka, Iceberg, etc... It would be good to add a generic way to add such methods. The intended end-user interaction would look like the following:
LOAD MODULE state
SELECT * FROM get_kafka_offsets(...)
In this PR I've added purely the possibility to load SQL functions with service loader.
Brief change log
Added module dynamic function loading possibility.
Verifying this change
Added new automated test.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation