-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: delete Kafka consumer group on drop #20065
base: main
Are you sure you want to change the base?
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
7d1afa8
to
7cbe311
Compare
7cbe311
to
1dc89f3
Compare
1dc89f3
to
00d90c8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
strange |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing to confirm, we do not remove source backfill's consumer group until the relevant streaming job is dropped.
@@ -196,6 +168,70 @@ impl SplitEnumerator for KafkaSplitEnumerator { | |||
|
|||
Ok(ret) | |||
} | |||
|
|||
async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> { | |||
let admin = build_kafka_admin(&self.config, &self.properties).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean we need to create an admin client each time we drop one consumer group? Shall we reuse the client, just like we reuse the client used for normal list splits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried that this increases unnecessary idle clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User also once asked why not close the connection after fetching metadata.
If that is the case, why not close the consumer after fetching the metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be the Kafka library's limitation. Even just for fetching metadata, we need to create consumers. And the consumers will immediately connect to all brokers (instead of only the bootstrap server)
If that is the case, why not close the consumer after fetching the metadata?
I think the user mistakenly believes we do nothing after the first fetch. But, actually, we keep querying the metadata and we cannot take the price of recreate one for each tick.
I'm worried that this increases unnecessary idle clients.
It is just 1 additional client per cluster, per one source. I think it should be acceptable.
If you still have concerns, we may do a bench with MSK to see how large creating multiple clients simultaneously can impact the broker and local server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we cannot take the price of recreate one for each tick.
For tick
yes, but for drop_fragment
here it's by nature short-lived
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as I mentioned before,
"I am afraid if iter over all source and create an admin client for each one when doing migration will put too much pressure on kafka broker."
the thing is about creating many clients in a short period of time rather than how long each client lives.
And again, a benchmark can resolve our gap here, hope it can correct my misunderstanding of the overhead of creating Kafka clients.
} | ||
} | ||
} else { | ||
for (source_id, fragment_ids) in &mut self.source_fragments { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid if iter over all source and create an admin client for each one when doing migration will put too much pressure on kafka broker.
/// Force [`ConnectorSourceWorker::tick()`] to be called. | ||
pub async fn force_tick(&self) -> MetaResult<()> { | ||
let (tx, rx) = oneshot::channel(); | ||
self.send_command(SourceWorkerCommand::Tick(tx))?; | ||
rx.await | ||
.context("failed to receive tick command response from source worker")? | ||
.context("source worker tick failed")?; | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully agree with making force_tick
rely on the command channel.
In prev impl, each enumerator's force_tick
is independent with each other. But in this impl, each request to force_tick
must be handled sequentially.
Consider this case, a channel containing (drop_fragment(x), force_tick(y))
, when handling drop_fragment(x)
, the network to x's broker is unstable and the call may hang for a while, or forever. But it blocks call for force_tick(y)
.
A real-world case, we call force_tick
when creating a source, the behavior to the user is like everything is good but SQL never returns, though the root cause is irrelevant to the creating one. It can cause much trouble when doing POC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A real-world case, we call
force_tick
when creating a source
No it will only be called in rare cases when splits
is None
. When CREATE SOURCE
, we call create_source_worker
, which will tick
before creating the source worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a big deal to handle force_tick
separately here. If drop_fragment
hangs, tick
will also unlikely succeed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If drop_fragment hangs, tick will also unlikely succeed
I don't seem to understand your logic here. Drop_fragment and force_tick can happen to different sources, why they tend to fail together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 ConnectorSourceWorker corresponds to 1 SplitEnumerator.
In prev impl, each enumerator's force_tick is independent with each other
enumerators are still independent with each other now. This is not changed.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
fix #18416
Delete Kafka consumer group when deleting source fragments. (Backfill is handled in the next PR) i.e.,
Impl details:
Checklist
Documentation
Release note
Now Kafka consumer groups created by RisingWave Kafka source will be deleted when the sources or related materialized views are dropped.