Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delete Kafka consumer group on drop #20065

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open

feat: delete Kafka consumer group on drop #20065

wants to merge 9 commits into from

Conversation

xxchan
Copy link
Member

@xxchan xxchan commented Jan 8, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

fix #18416

Delete Kafka consumer group when deleting source fragments. (Backfill is handled in the next PR) i.e.,

  • DROP (shared) SOURCE
  • DROP MV on (non-shared) source

Impl details:

  • The deletion is done in meta, when handling source changes.
  • Source manager sends a command to the Worker (which owns the SplitEnumerator)
  • When error happens, it will be ignored.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • My PR contains critical fixes that are necessary to be merged into the latest release.

Documentation

  • My PR needs documentation updates.
Release note

Now Kafka consumer groups created by RisingWave Kafka source will be deleted when the sources or related materialized views are dropped.

Copy link
Member Author

xxchan commented Jan 8, 2025

@xxchan xxchan force-pushed the xxchan/split_source branch from 7d1afa8 to 7cbe311 Compare January 8, 2025 06:23
@xxchan xxchan force-pushed the xxchan/split_source branch from 7cbe311 to 1dc89f3 Compare January 8, 2025 10:00
@xxchan xxchan force-pushed the xxchan/split_source branch from 1dc89f3 to 00d90c8 Compare January 8, 2025 13:04
Base automatically changed from xxchan/split_source to main January 8, 2025 13:51
@xxchan xxchan marked this pull request as ready for review January 8, 2025 13:57
@xxchan xxchan requested a review from a team as a code owner January 9, 2025 06:34
@xxchan xxchan requested a review from fuyufjh January 9, 2025 06:34
Copy link
Member

@stdrc stdrc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xxchan
Copy link
Member Author

xxchan commented Jan 9, 2025

2025-01-09T06:54:23.898047083Z DEBUG risingwave_connector::source::kafka::enumerator: delete groups result: [Ok("my_group_non_shared-543")] topic="test_consumer_group_non_shared" fragment_ids=[543]





failed to run `e2e_test/source_inline/kafka/consumer_group_non_shared.slt.serial`
  |  
  | Caused by:
  | system command stdout mismatch:
  | [command] ./e2e_test/source_inline/kafka/consumer_group.mjs count-groups \| grep my_group_non_shared
  | [Diff] (-expected\|+actual)
  | -   my_group_non_shared: 1
  | +   my_group_non_shared: 2
  | at e2e_test/source_inline/kafka/consumer_group_non_shared.slt.serial:115

strange

xxchan added 2 commits January 9, 2025 18:44
.
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
xxchan added 7 commits January 9, 2025 18:44
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing to confirm, we do not remove source backfill's consumer group until the relevant streaming job is dropped.

@@ -196,6 +168,70 @@ impl SplitEnumerator for KafkaSplitEnumerator {

Ok(ret)
}

async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = build_kafka_admin(&self.config, &self.properties).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean we need to create an admin client each time we drop one consumer group? Shall we reuse the client, just like we reuse the client used for normal list splits.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that this increases unnecessary idle clients.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User also once asked why not close the connection after fetching metadata.

#18949 (comment)

If that is the case, why not close the consumer after fetching the metadata?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the Kafka library's limitation. Even just for fetching metadata, we need to create consumers. And the consumers will immediately connect to all brokers (instead of only the bootstrap server)

If that is the case, why not close the consumer after fetching the metadata?

I think the user mistakenly believes we do nothing after the first fetch. But, actually, we keep querying the metadata and we cannot take the price of recreate one for each tick.

I'm worried that this increases unnecessary idle clients.

It is just 1 additional client per cluster, per one source. I think it should be acceptable.
If you still have concerns, we may do a bench with MSK to see how large creating multiple clients simultaneously can impact the broker and local server.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot take the price of recreate one for each tick.

For tick yes, but for drop_fragment here it's by nature short-lived

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I mentioned before,

"I am afraid if iter over all source and create an admin client for each one when doing migration will put too much pressure on kafka broker."

the thing is about creating many clients in a short period of time rather than how long each client lives.

And again, a benchmark can resolve our gap here, hope it can correct my misunderstanding of the overhead of creating Kafka clients.

}
}
} else {
for (source_id, fragment_ids) in &mut self.source_fragments {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid if iter over all source and create an admin client for each one when doing migration will put too much pressure on kafka broker.

src/connector/src/source/kafka/enumerator.rs Show resolved Hide resolved
Comment on lines +346 to +354
/// Force [`ConnectorSourceWorker::tick()`] to be called.
pub async fn force_tick(&self) -> MetaResult<()> {
let (tx, rx) = oneshot::channel();
self.send_command(SourceWorkerCommand::Tick(tx))?;
rx.await
.context("failed to receive tick command response from source worker")?
.context("source worker tick failed")?;
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully agree with making force_tick rely on the command channel.
In prev impl, each enumerator's force_tick is independent with each other. But in this impl, each request to force_tick must be handled sequentially.

Consider this case, a channel containing (drop_fragment(x), force_tick(y)), when handling drop_fragment(x), the network to x's broker is unstable and the call may hang for a while, or forever. But it blocks call for force_tick(y).
A real-world case, we call force_tick when creating a source, the behavior to the user is like everything is good but SQL never returns, though the root cause is irrelevant to the creating one. It can cause much trouble when doing POC.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A real-world case, we call force_tick when creating a source

No it will only be called in rare cases when splits is None. When CREATE SOURCE, we call create_source_worker, which will tick before creating the source worker

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a big deal to handle force_tick separately here. If drop_fragment hangs, tick will also unlikely succeed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If drop_fragment hangs, tick will also unlikely succeed

I don't seem to understand your logic here. Drop_fragment and force_tick can happen to different sources, why they tend to fail together?

Copy link
Member Author

@xxchan xxchan Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 ConnectorSourceWorker corresponds to 1 SplitEnumerator.

In prev impl, each enumerator's force_tick is independent with each other

enumerators are still independent with each other now. This is not changed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

delete Kafka consumer group when job is dropped
3 participants