diff --git a/e2e_test/source_inline/kafka/consumer_group.slt.serial b/e2e_test/source_inline/kafka/consumer_group.slt.serial index 29f41c82d6168..25d49dc9a5864 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt.serial +++ b/e2e_test/source_inline/kafka/consumer_group.slt.serial @@ -73,13 +73,12 @@ system ok # Test delete consumer group on drop -# my_group: 1 source fragment, 1 backfill fragment, 1 batch query -# TODO: drop backfill fragment on backfill finish +# my_group: 1 source fragment, 1 batch query, (1 backfill fragment's group is already dropped after backfill finished) # We only check my_group to avoid interfering with other tests. system ok retry 3 backoff 5s ./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group ---- -my_group: 3 +my_group: 2 statement ok @@ -90,7 +89,7 @@ sleep 1s system ok retry 3 backoff 5s ./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group ---- -my_group: 2 +my_group: 1 system ok diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 9bdafc047f7cc..b6fcb1bbbe614 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -138,6 +138,10 @@ pub trait SplitEnumerator: Sized + Send { async fn on_drop_fragments(&mut self, _fragment_ids: Vec) -> Result<()> { Ok(()) } + /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group. + async fn on_finish_backfill(&mut self, _fragment_ids: Vec) -> Result<()> { + Ok(()) + } } pub type SourceContextRef = Arc; diff --git a/src/connector/src/source/kafka/enumerator.rs b/src/connector/src/source/kafka/enumerator.rs index 4dd002e1567d6..80d1341e4a16b 100644 --- a/src/connector/src/source/kafka/enumerator.rs +++ b/src/connector/src/source/kafka/enumerator.rs @@ -67,7 +67,25 @@ pub struct KafkaSplitEnumerator { config: rdkafka::ClientConfig, } -impl KafkaSplitEnumerator {} +impl KafkaSplitEnumerator { + async fn drop_consumer_groups(&self, fragment_ids: Vec) -> ConnectorResult<()> { + let admin = build_kafka_admin(&self.config, &self.properties).await?; + let group_ids = fragment_ids + .iter() + .map(|fragment_id| self.properties.group_id(*fragment_id)) + .collect::>(); + let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect(); + let res = admin + .delete_groups(&group_ids, &AdminOptions::default()) + .await?; + tracing::debug!( + topic = self.topic, + ?fragment_ids, + "delete groups result: {res:?}" + ); + Ok(()) + } +} #[async_trait] impl SplitEnumerator for KafkaSplitEnumerator { @@ -170,21 +188,11 @@ impl SplitEnumerator for KafkaSplitEnumerator { } async fn on_drop_fragments(&mut self, fragment_ids: Vec) -> ConnectorResult<()> { - let admin = build_kafka_admin(&self.config, &self.properties).await?; - let group_ids = fragment_ids - .iter() - .map(|fragment_id| self.properties.group_id(*fragment_id)) - .collect::>(); - let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect(); - let res = admin - .delete_groups(&group_ids, &AdminOptions::default()) - .await?; - tracing::debug!( - topic = self.topic, - ?fragment_ids, - "delete groups result: {res:?}" - ); - Ok(()) + self.drop_consumer_groups(fragment_ids).await + } + + async fn on_finish_backfill(&mut self, fragment_ids: Vec) -> ConnectorResult<()> { + self.drop_consumer_groups(fragment_ids).await } } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 6f46310c4ff94..94458208a4668 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -182,6 +182,14 @@ impl SourceManagerCore { } for (source_id, fragments) in added_backfill_fragments { + // Note: when the backfill fragment is considered created, backfill must be finished. + let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| { + panic!( + "source {} not found when adding backfill fragments {:?}", + source_id, fragments + ); + }); + handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect()); self.backfill_fragments .entry(source_id) .or_default() @@ -471,7 +479,8 @@ impl SourceManager { } pub enum SourceChange { - /// `CREATE SOURCE` (shared), or `CREATE MV` + /// `CREATE SOURCE` (shared), or `CREATE MV`. + /// Note this is applied _after_ the job is successfully created (`post_collect` barrier). CreateJob { added_source_fragments: HashMap>, added_backfill_fragments: HashMap>, diff --git a/src/meta/src/stream/source_manager/worker.rs b/src/meta/src/stream/source_manager/worker.rs index 5e2a89b757297..a6a133bf77353 100644 --- a/src/meta/src/stream/source_manager/worker.rs +++ b/src/meta/src/stream/source_manager/worker.rs @@ -241,6 +241,12 @@ impl ConnectorSourceWorker

{ tracing::warn!(error = %e.as_report(), "error happened when drop fragment"); } } + SourceWorkerCommand::FinishBackfill(fragment_ids) => { + if let Err(e) = self.finish_backfill(fragment_ids).await { + // when error happens, we just log it and ignore + tracing::warn!(error = %e.as_report(), "error happened when finish backfill"); + } + } SourceWorkerCommand::Terminate => { return; } @@ -287,6 +293,11 @@ impl ConnectorSourceWorker

{ self.enumerator.on_drop_fragments(fragment_ids).await?; Ok(()) } + + async fn finish_backfill(&mut self, fragment_ids: Vec) -> MetaResult<()> { + self.enumerator.on_finish_backfill(fragment_ids).await?; + Ok(()) + } } /// Handle for a running [`ConnectorSourceWorker`]. @@ -360,6 +371,13 @@ impl ConnectorSourceWorkerHandle { } } + pub fn finish_backfill(&self, fragment_ids: Vec) { + if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) { + // ignore error, just log it + tracing::warn!(error = %e.as_report(), "failed to finish backfill"); + } + } + pub fn terminate(&self, dropped_fragments: Option>) { if let Some(dropped_fragments) = dropped_fragments { self.drop_fragments(dropped_fragments.into_iter().collect()); @@ -378,6 +396,8 @@ pub enum SourceWorkerCommand { Tick(#[educe(Debug(ignore))] oneshot::Sender>), /// Async command to drop a fragment. DropFragments(Vec), + /// Async command to finish backfill. + FinishBackfill(Vec), /// Terminate the worker task. Terminate, }