Skip to content

[FLINK-37436] Fix that flink-connector-pulsar used a incorrect API when dynamically creating topics by DynamicTopicRouter #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

poorbarcode
Copy link

@poorbarcode poorbarcode commented Mar 7, 2025

Purpose of the change

Background 1: dynamically create a Pulsar Topic by Flink connector-pulsar

Flink connector-pulsar provided a way to dynamically create a Pulsar Topic when DynamicTopicRouter returns a non-existing one. see also: flink-connector-pulsar/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java at main · apache/flink-connector-pulsar

  • pulsarClient.getPartitionsForTopic(topic) will create a topic automatically if it does not exist.

Background 2:  how dynamically created topics in Pulsar Server

  • There is a config named allowAutoTopicCreationType, which can be set to partitioned or non-partitioned
  • If it was set partitioned, Pulsar will create a partitioned topic with {defaultNumPartitions} partitions. For example, Pulsar will create topics named {tenant}/{namespace}/{topic name}-partition-0 and {tenant}/{namespace}/{topic name}-partition-1, and create a relationship between them, which indicates they are in a same partitioned topic.
  • If it was set non-partitioned, Pulsar will create a non-partitioned topic. Pulsar will create topics named {tenant}/{namespace}/{topic name}, which does not include a suffix partition-{num}.

Issue:

  • if pulsarClient.getPartitionsForTopic(topic) get a param {tenant}/{namespace}/{topic name}-partition-0, which includes the suffix partition-0, Pulsar will create a non-partitioned topic named {tenant}/{namespace}/{topic name}-partition-0
  • After you call pulsarClient.getPartitionsForTopic(topic) with a param {tenant}/{namespace}/{topic name}-partition-1, you will get two partitions named {tenant}/{namespace}/{topic name}-partition-0 and {tenant}/{namespace}/{topic name}-partition-1, but there is no relationship record between them.

Relates to https://issues.apache.org/jira/projects/FLINK/issues/FLINK-37436

Brief change log

Fix the incorrect API calling.

Verifying this change

This change is a minor change and don't have any tests.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for
convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

…en dynamically creating topics by DynamicTopicRouter
Copy link

boring-cyborg bot commented Mar 7, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant