Skip to content

[FLINK-38257][SQL] Add module dynamic function loading possibility #26933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

gaborgsomogyi
Copy link
Contributor

What is the purpose of the change

Modules like the state processor API could add SQL functions which are coming from external connectors like Kafka, Iceberg, etc... It would be good to add a generic way to add such methods. The intended end-user interaction would look like the following:

  • Add Kafka connector jar to the classpath
  • LOAD MODULE state
  • SELECT * FROM get_kafka_offsets(...)

In this PR I've added purely the possibility to load SQL functions with service loader.

Brief change log

Added module dynamic function loading possibility.

Verifying this change

Added new automated test.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@zch93 zch93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR cleanly introduces a minimal SPI and wires the state module to load connector-owned TableFunctions at runtime via ServiceLoader, moving the built-in savepoint_metadata into its own TF class and proving the mechanism with an example + test. It achieves the stated goal with a small, well-scoped change and keeps Flink core free of connector dependencies. I wrote only some nits and questions.

@@ -41,6 +46,39 @@
+ "operator-coordinator-state-size-in-bytes BIGINT NOT NULL, "
+ "operator-total-size-in-bytes BIGINT NOT NULL>"))
public class SavepointMetadataTableFunction extends TableFunction<Row> {

public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to keep the definition near the code. do you think we should also mark that the function should not be pre-calculated by the optimizer? i mean to make it clear that flink must always run it at query time, not try to be clever and “pre-compute” it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not yet clear🤷🏻‍♂️

Copy link
Contributor

@zch93 zch93 Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i misunderstood what happens here with savepoint_metadata, but flink's planner can constant-fold and cache function results at plan time if it believes a function is deterministic. that is correct for pure math/string functions, but could be problematic for something like this savepoint_metadata, which read external, changing state (fs/checkpoint files). and between planning and exec, or between two exec, the metadata could change. so i just thought whether we should tell the planner this function is non-deterministic, so it will not pre-evaluate or cache it, and will always run it at exec time. idk whether the BuiltInFunctionDefinition.newBuilder() can supports to call .deterministic(false), or use and override the UDF isDeterministic()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair point. I'm not fan of fixing something which is unrelated but this is so tiny that adding notDeterministic() is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it worth the discussion separate whether the planner caches it with input params or not. I would guess yes, otherwise it just doesn't make sense from usage point of view. If the params are stored together with the result then adding notDeterministic() doesn't hurt but also not helping.

"operator-total-size-in-bytes",
DataTypes.BIGINT().notNull()))))
.build();

public static final StateModule INSTANCE = new StateModule();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is not part of this pr, but it seems like the constructor does classpath scanning, and that is fine if the module is loaded once, but worth a note: if someone repeatedly constructs modules in tests, scanning happens each time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think testing the module as it's going to behave in PROD is something which is desired even if it requires some time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, totally fair
my (out-of-scope) nit was only about to maybe add a short javadoc on StateModule noting that the constructor performs ServiceLoader discovery, so repeated instantiation re-scans the classpath, and maybe in some tests, future authors can prefer StateModule.INSTANCE over new StateModule() to avoid unintentional repeated scans

@Test
public void testDynamicBuiltinFunction() throws Exception {
Configuration config = new Configuration();
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should add a test with streaming mode too and add a negative test for name collisions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name collision test has been added but streaming would add no value here.


import java.util.List;

public class ExampleDynamicBuiltInFunctionDefinitionFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

api looks minimal and clear.
nit: maybe we can standardize to . and document that function names returned must be unique within the module (and what happens otherwise)
nit: maybe we can add an optional ModuleContext param (classloader, config). not needed now, but helps if a provider later needs context without breaking the spi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is now fully experimental and later subject to change. If the time comes and ModuleContext or something else is required then we're going to add it. Finding out later needs is just not working in general 🙂
Related the names there is already a builtin function name suggestion which already given. Of course this is not documented because it's not public API and I would stick to that since it exists from ages.

Copy link
Contributor

@zch93 zch93 Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks for clarifying
on the “built-in function name suggestion”: understood, it is internal and long-standing, so i am fine relying on it. if feasible, a short internal comment pointing to where that suggestion lives would help new contributors

bottom line: +1 to keep the api minimal as proposed

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Aug 22, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 23, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 23, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants