Skip to content

Commit

Permalink
Join utilization_task when stopping topology
Browse files Browse the repository at this point in the history
  • Loading branch information
esensar committed Jan 10, 2025
1 parent bb5d7ca commit d7d8694
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,21 @@ impl RunningTopology {
// pump in self.tasks, and the other for source in self.source_tasks.
let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();

let map_closure = |_result| ();

// We need to give some time to the sources to gracefully shutdown, so
// we will merge them with other tasks.
for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
let task = task.map(|_result| ()).shared();
let task = task.map(map_closure).shared();

wait_handles.push(task.clone());
check_handles.entry(key).or_default().push(task);
}

if let Some(utilization_task) = self.utilization_task {
wait_handles.push(utilization_task.map(map_closure).shared());
}

// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self
.graceful_shutdown_duration
Expand Down

0 comments on commit d7d8694

Please sign in to comment.