Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf: remove clone on uninitiated_partitions in SortPreservingMergeStream #15562

Merged
merged 4 commits into from
Apr 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,8 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// we skip the following block. Until then, this function may be called multiple
// times and can return Poll::Pending if any partition returns Poll::Pending.
if self.loser_tree.is_empty() {
let remaining_partitions = self.uninitiated_partitions.clone();
for i in remaining_partitions {
match self.maybe_poll_stream(cx, i) {
while let Some(&partition_idx) = self.uninitiated_partitions.front() {
match self.maybe_poll_stream(cx, partition_idx) {
Poll::Ready(Err(e)) => {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
Expand All @@ -228,10 +227,8 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// If a partition returns Poll::Pending, to avoid continuously polling it
// and potentially increasing upstream buffer sizes, we move it to the
// back of the polling queue.
if let Some(front) = self.uninitiated_partitions.pop_front() {
// This pop_front can never return `None`.
self.uninitiated_partitions.push_back(front);
}
self.uninitiated_partitions.rotate_left(1);

// This function could remain in a pending state, so we manually wake it here.
// However, this approach can be investigated further to find a more natural way
// to avoid disrupting the runtime scheduler.
Expand All @@ -241,10 +238,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
_ => {
// If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None),
// we remove this partition from the queue so it is not polled again.
self.uninitiated_partitions.retain(|idx| *idx != i);
self.uninitiated_partitions.pop_front();
}
}
}

// Claim the memory for the uninitiated partitions
self.uninitiated_partitions.shrink_to_fit();
self.init_loser_tree();
}

Expand Down