Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(swordfish): Parallel expression evaluation #3593

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Dec 17, 2024

Addresses: #3389. More generally, this PR optimizes for projections with many expressions, particularly memory intensive expressions like UDFs.

Problem:

Currently, swordfish parallelizes projections across morsels, with 1 CPU per morsel. However, if each projection has many memory intensive expressions, we could experience a massive inflation in memory because we will have many materialized morsels living in memory at once.

Proposed solution:

Instead, we can parallelize the expressions within the projection (but only for expressions that require compute). This way, we still have good CPU utilization, but we keep a lower number of materialized morsels in memory.

In the linked issue above, we see that a 128cpu machine will parallelize morsels across the cores, each doing multiple udfs, resulting in "317GB allocations and duration 351 secs".

This PR reduces that to 7.8GB peak memory and runtime of 66 seconds.
Screenshot 2024-12-17 at 3 54 06 PM

Notes:

  • Found a bug with the loole channels where an async send to a sync receive was not respecting capacity constraints, and was allowing sends even though the receive did not happen. Moved over to https://github.com/fereidani/kanal, which worked much better.

Todos for next time:

  • We should also be able to parallelize expression evaluation within a single expression, since it is a tree. We can calculate max width of the tree and set that as max parallel tasks.

@github-actions github-actions bot added the perf label Dec 17, 2024
Copy link

codspeed-hq bot commented Dec 18, 2024

CodSpeed Performance Report

Merging #3593 will improve performances by 40.97%

Comparing colin/par-eval-expr (ec3702b) with main (43bbbeb)

Summary

⚡ 2 improvements
✅ 25 untouched benchmarks

Benchmarks breakdown

Benchmark main colin/par-eval-expr Change
test_count[1 Small File] 3.8 ms 3.3 ms +15.45%
test_iter_rows_first_row[100 Small Files] 214.2 ms 151.9 ms +40.97%

Copy link

codecov bot commented Dec 18, 2024

Codecov Report

Attention: Patch coverage is 92.47312% with 14 lines in your changes missing coverage. Please review.

Project coverage is 78.09%. Comparing base (43bbbeb) to head (ec3702b).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-dsl/src/expr/mod.rs 68.00% 8 Missing ⚠️
...ft-local-execution/src/intermediate_ops/project.rs 92.00% 4 Missing ⚠️
src/daft-local-execution/src/run.rs 91.30% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3593      +/-   ##
==========================================
+ Coverage   78.06%   78.09%   +0.03%     
==========================================
  Files         728      728              
  Lines       90044    90163     +119     
==========================================
+ Hits        70294    70417     +123     
+ Misses      19750    19746       -4     
Files with missing lines Coverage Δ
src/daft-connect/src/op/execute/root.rs 96.00% <100.00%> (ø)
src/daft-connect/src/op/execute/write.rs 80.24% <100.00%> (+0.24%) ⬆️
src/daft-local-execution/src/channel.rs 98.14% <100.00%> (-0.10%) ⬇️
src/daft-local-execution/src/dispatcher.rs 93.52% <100.00%> (ø)
...-execution/src/intermediate_ops/intermediate_op.rs 83.33% <100.00%> (+0.66%) ⬆️
src/daft-local-execution/src/pipeline.rs 87.93% <100.00%> (+0.08%) ⬆️
src/daft-local-execution/src/runtime_stats.rs 67.17% <100.00%> (-0.74%) ⬇️
...rc/daft-local-execution/src/sinks/blocking_sink.rs 85.15% <100.00%> (ø)
...c/daft-local-execution/src/sinks/streaming_sink.rs 82.01% <100.00%> (+0.71%) ⬆️
src/daft-local-execution/src/sources/source.rs 58.44% <100.00%> (ø)
... and 5 more

... and 2 files with indirect coverage changes

@@ -46,7 +46,7 @@ impl Session {
let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;

let mut result_stream = native_executor.run(&pset, cfg, None)?.into_stream();
let mut result_stream = pin!(native_executor.run(&pset, cfg, None)?.into_stream());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kanal receiver doesn't provide a way to get an owned stream, so I manually made one using unfold, with a recv.await inside. Because of this await point I need to pin the stream.

@@ -36,7 +32,7 @@ pub(crate) fn create_ordering_aware_receiver_channel<T: Clone>(
) -> (Vec<Sender<T>>, OrderingAwareReceiver<T>) {
match ordered {
true => {
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(1)).unzip();
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(0)).unzip();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting channel sizes to 0 can elide heap allocation.

@colin-ho colin-ho marked this pull request as ready for review December 19, 2024 00:38
@colin-ho colin-ho requested a review from samster25 December 19, 2024 01:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant