Skip to content

Commit

Permalink
feat: delete Kafka consumer group when source backfill is finished
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 9, 2025
1 parent 5f945c8 commit b273cf4
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 21 deletions.
7 changes: 3 additions & 4 deletions e2e_test/source_inline/kafka/consumer_group.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ system ok

# Test delete consumer group on drop

# my_group: 1 source fragment, 1 backfill fragment, 1 batch query
# TODO: drop backfill fragment on backfill finish
# my_group: 1 source fragment, 1 batch query, (1 backfill fragment's group is already dropped after backfill finished)
# We only check my_group to avoid interfering with other tests.
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 3
my_group: 2


statement ok
Expand All @@ -90,7 +89,7 @@ sleep 1s
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 2
my_group: 1


system ok
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ pub trait SplitEnumerator: Sized + Send {
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
/// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
}

pub type SourceContextRef = Arc<SourceContext>;
Expand Down
40 changes: 24 additions & 16 deletions src/connector/src/source/kafka/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,25 @@ pub struct KafkaSplitEnumerator {
config: rdkafka::ClientConfig,
}

impl KafkaSplitEnumerator {}
impl KafkaSplitEnumerator {
async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = build_kafka_admin(&self.config, &self.properties).await?;
let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
.collect::<Vec<_>>();
let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
let res = admin
.delete_groups(&group_ids, &AdminOptions::default())
.await?;
tracing::debug!(
topic = self.topic,
?fragment_ids,
"delete groups result: {res:?}"
);
Ok(())
}
}

#[async_trait]
impl SplitEnumerator for KafkaSplitEnumerator {
Expand Down Expand Up @@ -170,21 +188,11 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = build_kafka_admin(&self.config, &self.properties).await?;
let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
.collect::<Vec<_>>();
let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
let res = admin
.delete_groups(&group_ids, &AdminOptions::default())
.await?;
tracing::debug!(
topic = self.topic,
?fragment_ids,
"delete groups result: {res:?}"
);
Ok(())
self.drop_consumer_groups(fragment_ids).await
}

async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
self.drop_consumer_groups(fragment_ids).await
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ impl SourceManagerCore {
}

for (source_id, fragments) in added_backfill_fragments {
// Note: when the backfill fragment is considered created, backfill must be finished.
let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
panic!(
"source {} not found when adding backfill fragments {:?}",
source_id, fragments
);
});
handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
self.backfill_fragments
.entry(source_id)
.or_default()
Expand Down Expand Up @@ -471,7 +479,8 @@ impl SourceManager {
}

pub enum SourceChange {
/// `CREATE SOURCE` (shared), or `CREATE MV`
/// `CREATE SOURCE` (shared), or `CREATE MV`.
/// Note this is applied _after_ the job is successfully created (`post_collect` barrier).
CreateJob {
added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
Expand Down
20 changes: 20 additions & 0 deletions src/meta/src/stream/source_manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
}
}
SourceWorkerCommand::FinishBackfill(fragment_ids) => {
if let Err(e) = self.finish_backfill(fragment_ids).await {
// when error happens, we just log it and ignore
tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
}
}
SourceWorkerCommand::Terminate => {
return;
}
Expand Down Expand Up @@ -287,6 +293,11 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
self.enumerator.on_drop_fragments(fragment_ids).await?;
Ok(())
}

async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
self.enumerator.on_finish_backfill(fragment_ids).await?;
Ok(())
}
}

/// Handle for a running [`ConnectorSourceWorker`].
Expand Down Expand Up @@ -360,6 +371,13 @@ impl ConnectorSourceWorkerHandle {
}
}

pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
// ignore error, just log it
tracing::warn!(error = %e.as_report(), "failed to finish backfill");
}
}

pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
if let Some(dropped_fragments) = dropped_fragments {
self.drop_fragments(dropped_fragments.into_iter().collect());
Expand All @@ -378,6 +396,8 @@ pub enum SourceWorkerCommand {
Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
/// Async command to drop a fragment.
DropFragments(Vec<FragmentId>),
/// Async command to finish backfill.
FinishBackfill(Vec<FragmentId>),
/// Terminate the worker task.
Terminate,
}

0 comments on commit b273cf4

Please sign in to comment.