Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delete Kafka consumer group when source backfill is finished #20077

Open
wants to merge 1 commit into
base: xxchan/drop-cg
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions e2e_test/source_inline/kafka/consumer_group.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ system ok

# Test delete consumer group on drop

# my_group: 1 source fragment, 1 backfill fragment, 1 batch query
# TODO: drop backfill fragment on backfill finish
# my_group: 1 source fragment, 1 batch query, (1 backfill fragment's group is already dropped after backfill finished)
# We only check my_group to avoid interfering with other tests.
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 3
my_group: 2


statement ok
Expand All @@ -90,7 +89,7 @@ sleep 1s
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 2
my_group: 1


system ok
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ pub trait SplitEnumerator: Sized + Send {
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
/// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
}

pub type SourceContextRef = Arc<SourceContext>;
Expand Down
40 changes: 24 additions & 16 deletions src/connector/src/source/kafka/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,25 @@ pub struct KafkaSplitEnumerator {
config: rdkafka::ClientConfig,
}

impl KafkaSplitEnumerator {}
impl KafkaSplitEnumerator {
async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = build_kafka_admin(&self.config, &self.properties).await?;
let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
.collect::<Vec<_>>();
let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
let res = admin
.delete_groups(&group_ids, &AdminOptions::default())
.await?;
tracing::debug!(
topic = self.topic,
?fragment_ids,
"delete groups result: {res:?}"
);
Ok(())
}
}

#[async_trait]
impl SplitEnumerator for KafkaSplitEnumerator {
Expand Down Expand Up @@ -170,21 +188,11 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = build_kafka_admin(&self.config, &self.properties).await?;
let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
.collect::<Vec<_>>();
let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
let res = admin
.delete_groups(&group_ids, &AdminOptions::default())
.await?;
tracing::debug!(
topic = self.topic,
?fragment_ids,
"delete groups result: {res:?}"
);
Ok(())
self.drop_consumer_groups(fragment_ids).await
}

async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
self.drop_consumer_groups(fragment_ids).await
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ impl SourceManagerCore {
}

for (source_id, fragments) in added_backfill_fragments {
// Note: when the backfill fragment is considered created, backfill must be finished.
let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
panic!(
"source {} not found when adding backfill fragments {:?}",
source_id, fragments
);
});
handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
self.backfill_fragments
.entry(source_id)
.or_default()
Expand Down Expand Up @@ -471,7 +479,8 @@ impl SourceManager {
}

pub enum SourceChange {
/// `CREATE SOURCE` (shared), or `CREATE MV`
/// `CREATE SOURCE` (shared), or `CREATE MV`.
/// Note this is applied _after_ the job is successfully created (`post_collect` barrier).
CreateJob {
added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
Expand Down
20 changes: 20 additions & 0 deletions src/meta/src/stream/source_manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
}
}
SourceWorkerCommand::FinishBackfill(fragment_ids) => {
if let Err(e) = self.finish_backfill(fragment_ids).await {
// when error happens, we just log it and ignore
tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
}
}
SourceWorkerCommand::Terminate => {
return;
}
Expand Down Expand Up @@ -287,6 +293,11 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
self.enumerator.on_drop_fragments(fragment_ids).await?;
Ok(())
}

async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
self.enumerator.on_finish_backfill(fragment_ids).await?;
Ok(())
}
}

/// Handle for a running [`ConnectorSourceWorker`].
Expand Down Expand Up @@ -360,6 +371,13 @@ impl ConnectorSourceWorkerHandle {
}
}

pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
// ignore error, just log it
tracing::warn!(error = %e.as_report(), "failed to finish backfill");
}
}

pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
if let Some(dropped_fragments) = dropped_fragments {
self.drop_fragments(dropped_fragments.into_iter().collect());
Expand All @@ -378,6 +396,8 @@ pub enum SourceWorkerCommand {
Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
/// Async command to drop a fragment.
DropFragments(Vec<FragmentId>),
/// Async command to finish backfill.
FinishBackfill(Vec<FragmentId>),
/// Terminate the worker task.
Terminate,
}
Loading