Skip to content

Commit 19672d1

Browse files
author
EC2 Default User
committed
par eval expr
1 parent e148248 commit 19672d1

File tree

18 files changed

+261
-73
lines changed

18 files changed

+261
-73
lines changed

Cargo.lock

+31-13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/runtime/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ futures = {workspace = true}
44
lazy_static = {workspace = true}
55
log = {workspace = true}
66
oneshot = "0.1.8"
7+
thread-priority = "1.2.0"
78
tokio = {workspace = true}
89

910
[lints]

src/common/runtime/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::{
1212
use common_error::{DaftError, DaftResult};
1313
use futures::FutureExt;
1414
use lazy_static::lazy_static;
15+
use thread_priority::set_current_thread_priority;
1516
use tokio::{
1617
runtime::{Handle, RuntimeFlavor},
1718
task::JoinSet,
@@ -149,6 +150,9 @@ fn init_compute_runtime() -> RuntimeRef {
149150
let id = COMPUTE_THREAD_ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
150151
format!("Compute-Thread-{}", id)
151152
})
153+
.on_thread_start(|| {
154+
set_current_thread_priority(thread_priority::ThreadPriority::Min).unwrap();
155+
})
152156
.max_blocking_threads(*COMPUTE_RUNTIME_MAX_BLOCKING_THREADS);
153157
Runtime::new(builder.build().unwrap(), PoolType::Compute)
154158
})

src/daft-dsl/src/expr/mod.rs

+32
Original file line numberDiff line numberDiff line change
@@ -1219,6 +1219,38 @@ impl Expr {
12191219
_ => None,
12201220
}
12211221
}
1222+
1223+
pub fn has_compute(&self) -> bool {
1224+
match self {
1225+
Self::Column(..) => false,
1226+
Self::Literal(..) => false,
1227+
Self::Subquery(..) => false,
1228+
Self::Exists(..) => false,
1229+
Self::OuterReferenceColumn(..) => false,
1230+
Self::Function { .. } => true,
1231+
Self::ScalarFunction(..) => true,
1232+
Self::Agg(_) => true,
1233+
Self::Alias(expr, ..) => expr.has_compute(),
1234+
Self::Cast(expr, ..) => expr.has_compute(),
1235+
Self::Not(expr) => expr.has_compute(),
1236+
Self::IsNull(expr) => expr.has_compute(),
1237+
Self::NotNull(expr) => expr.has_compute(),
1238+
Self::FillNull(expr, fill_value) => expr.has_compute() || fill_value.has_compute(),
1239+
Self::IsIn(expr, items) => {
1240+
expr.has_compute() || items.iter().any(|item| item.has_compute())
1241+
}
1242+
Self::Between(expr, lower, upper) => {
1243+
expr.has_compute() || lower.has_compute() || upper.has_compute()
1244+
}
1245+
Self::BinaryOp { left, right, .. } => left.has_compute() || right.has_compute(),
1246+
Self::IfElse {
1247+
if_true,
1248+
if_false,
1249+
predicate,
1250+
} => if_true.has_compute() || if_false.has_compute() || predicate.has_compute(),
1251+
Self::InSubquery(expr, _) => expr.has_compute(),
1252+
}
1253+
}
12221254
}
12231255

12241256
#[derive(Display, Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]

src/daft-local-execution/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ daft-table = {path = "../daft-table", default-features = false}
2323
daft-writers = {path = "../daft-writers", default-features = false}
2424
futures = {workspace = true}
2525
indexmap = {workspace = true}
26+
kanal = "0.1.0-pre8"
2627
lazy_static = {workspace = true}
2728
log = {workspace = true}
28-
loole = "0.4.0"
2929
num-format = "0.4.4"
3030
pin-project = "1"
3131
pyo3 = {workspace = true, optional = true}
3232
snafu = {workspace = true}
3333
tokio = {workspace = true}
34+
tokio-stream = {workspace = true}
3435
tokio-util = {workspace = true}
3536
tracing = {workspace = true}
3637

src/daft-local-execution/src/channel.rs

+8-12
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,25 @@
11
#[derive(Clone)]
2-
pub(crate) struct Sender<T>(loole::Sender<T>);
2+
pub(crate) struct Sender<T>(kanal::AsyncSender<T>);
33
impl<T> Sender<T> {
4-
pub(crate) async fn send(&self, val: T) -> Result<(), loole::SendError<T>> {
5-
self.0.send_async(val).await
4+
pub(crate) async fn send(&self, val: T) -> Result<(), kanal::SendError> {
5+
self.0.send(val).await
66
}
77
}
88

99
#[derive(Clone)]
10-
pub(crate) struct Receiver<T>(loole::Receiver<T>);
10+
pub(crate) struct Receiver<T>(kanal::AsyncReceiver<T>);
1111
impl<T> Receiver<T> {
1212
pub(crate) async fn recv(&self) -> Option<T> {
13-
self.0.recv_async().await.ok()
13+
self.0.recv().await.ok()
1414
}
1515

16-
pub(crate) fn blocking_recv(&self) -> Option<T> {
17-
self.0.recv().ok()
18-
}
19-
20-
pub(crate) fn into_inner(self) -> loole::Receiver<T> {
16+
pub(crate) fn into_inner(self) -> kanal::AsyncReceiver<T> {
2117
self.0
2218
}
2319
}
2420

2521
pub(crate) fn create_channel<T: Clone>(buffer_size: usize) -> (Sender<T>, Receiver<T>) {
26-
let (tx, rx) = loole::bounded(buffer_size);
22+
let (tx, rx) = kanal::bounded_async::<T>(buffer_size);
2723
(Sender(tx), Receiver(rx))
2824
}
2925

@@ -36,7 +32,7 @@ pub(crate) fn create_ordering_aware_receiver_channel<T: Clone>(
3632
) -> (Vec<Sender<T>>, OrderingAwareReceiver<T>) {
3733
match ordered {
3834
true => {
39-
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(1)).unzip();
35+
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(0)).unzip();
4036
(
4137
senders,
4238
OrderingAwareReceiver::InOrder(RoundRobinReceiver::new(receiver)),

src/daft-local-execution/src/dispatcher.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl DispatchSpawner for RoundRobinDispatcher {
9595
runtime_handle: &mut RuntimeHandle,
9696
) -> SpawnedDispatchResult {
9797
let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) =
98-
(0..num_workers).map(|_| create_channel(1)).unzip();
98+
(0..num_workers).map(|_| create_channel(0)).unzip();
9999
let morsel_size = self.morsel_size;
100100
let task = runtime_handle.spawn(async move {
101101
Self::dispatch_inner(worker_senders, input_receivers, morsel_size).await
@@ -213,7 +213,7 @@ impl DispatchSpawner for PartitionedDispatcher {
213213
runtime_handle: &mut RuntimeHandle,
214214
) -> SpawnedDispatchResult {
215215
let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) =
216-
(0..num_workers).map(|_| create_channel(1)).unzip();
216+
(0..num_workers).map(|_| create_channel(0)).unzip();
217217
let partition_by = self.partition_by.clone();
218218
let dispatch_task = runtime_handle.spawn(async move {
219219
Self::dispatch_inner(worker_senders, input_receivers, partition_by).await

src/daft-local-execution/src/intermediate_ops/intermediate_op.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl PipelineNode for IntermediateNode {
212212
let num_workers = op.max_concurrency().context(PipelineExecutionSnafu {
213213
node_name: self.name(),
214214
})?;
215-
let (destination_sender, destination_receiver) = create_channel(1);
215+
let (destination_sender, destination_receiver) = create_channel(0);
216216
let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone());
217217

218218
let dispatch_spawner = self
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{cmp::max, sync::Arc};
22

33
use common_error::{DaftError, DaftResult};
44
use common_runtime::RuntimeRef;
@@ -12,15 +12,73 @@ use super::intermediate_op::{
1212
};
1313
use crate::NUM_CPUS;
1414

15+
fn num_parallel_exprs(projection: &[ExprRef]) -> usize {
16+
max(
17+
projection
18+
.iter()
19+
.map(|expr| expr.has_compute())
20+
.filter(|x| *x)
21+
.count(),
22+
1,
23+
)
24+
}
25+
1526
pub struct ProjectOperator {
1627
projection: Arc<Vec<ExprRef>>,
28+
max_concurrency: usize,
29+
parallel_exprs: usize,
1730
}
1831

1932
impl ProjectOperator {
20-
pub fn new(projection: Vec<ExprRef>) -> Self {
21-
Self {
33+
pub fn new(projection: Vec<ExprRef>) -> DaftResult<Self> {
34+
let (max_concurrency, parallel_exprs) = Self::get_optimal_allocation(&projection)?;
35+
Ok(Self {
2236
projection: Arc::new(projection),
23-
}
37+
max_concurrency,
38+
parallel_exprs,
39+
})
40+
}
41+
42+
// This function is used to determine the optimal allocation of concurrency and expression parallelism
43+
fn get_optimal_allocation(projection: &[ExprRef]) -> DaftResult<(usize, usize)> {
44+
let resource_request = get_resource_request(projection);
45+
// The number of CPUs available for the operator.
46+
let available_cpus = match resource_request {
47+
// If the resource request specifies a number of CPUs, the available cpus is the number of actual CPUs
48+
// divided by the requested number of CPUs, clamped to (1, NUM_CPUS).
49+
// E.g. if the resource request specifies 2 CPUs and NUM_CPUS is 4, the number of available cpus is 2.
50+
Some(resource_request) if resource_request.num_cpus().is_some() => {
51+
let requested_num_cpus = resource_request.num_cpus().unwrap();
52+
if requested_num_cpus > *NUM_CPUS as f64 {
53+
Err(DaftError::ValueError(format!(
54+
"Requested {} CPUs but found only {} available",
55+
requested_num_cpus, *NUM_CPUS
56+
)))
57+
} else {
58+
Ok(
59+
(*NUM_CPUS as f64 / requested_num_cpus).clamp(1.0, *NUM_CPUS as f64)
60+
as usize,
61+
)
62+
}
63+
}
64+
_ => Ok(*NUM_CPUS),
65+
}?;
66+
67+
let max_parallel_exprs = num_parallel_exprs(projection);
68+
69+
// Calculate optimal concurrency using ceiling division
70+
// Example: For 128 CPUs and 60 parallel expressions:
71+
// max_concurrency = (128 + 60 - 1) / 60 = 3 concurrent tasks
72+
// This ensures we never exceed max_parallel_exprs per task
73+
let max_concurrency = (available_cpus + max_parallel_exprs - 1) / max_parallel_exprs;
74+
75+
// Calculate actual parallel expressions per task using floor division
76+
// Example: For 128 CPUs and 3 concurrent tasks:
77+
// num_parallel_exprs = 128 / 3 = 42 parallel expressions per task
78+
// This ensures even distribution across concurrent tasks
79+
let num_parallel_exprs = available_cpus / max_concurrency;
80+
81+
Ok((max_concurrency, num_parallel_exprs))
2482
}
2583
}
2684

@@ -33,9 +91,16 @@ impl IntermediateOperator for ProjectOperator {
3391
runtime: &RuntimeRef,
3492
) -> IntermediateOpExecuteResult {
3593
let projection = self.projection.clone();
94+
let num_parallel_exprs = self.parallel_exprs;
3695
runtime
3796
.spawn(async move {
38-
let out = input.eval_expression_list(&projection)?;
97+
let out = if num_parallel_exprs > 1 {
98+
input
99+
.par_eval_expression_list(&projection, num_parallel_exprs)
100+
.await?
101+
} else {
102+
input.eval_expression_list(&projection)?
103+
};
39104
Ok((
40105
state,
41106
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
@@ -49,26 +114,6 @@ impl IntermediateOperator for ProjectOperator {
49114
}
50115

51116
fn max_concurrency(&self) -> DaftResult<usize> {
52-
let resource_request = get_resource_request(&self.projection);
53-
match resource_request {
54-
// If the resource request specifies a number of CPUs, the max concurrency is the number of CPUs
55-
// divided by the requested number of CPUs, clamped to (1, NUM_CPUS).
56-
// E.g. if the resource request specifies 2 CPUs and NUM_CPUS is 4, the max concurrency is 2.
57-
Some(resource_request) if resource_request.num_cpus().is_some() => {
58-
let requested_num_cpus = resource_request.num_cpus().unwrap();
59-
if requested_num_cpus > *NUM_CPUS as f64 {
60-
Err(DaftError::ValueError(format!(
61-
"Requested {} CPUs but found only {} available",
62-
requested_num_cpus, *NUM_CPUS
63-
)))
64-
} else {
65-
Ok(
66-
(*NUM_CPUS as f64 / requested_num_cpus).clamp(1.0, *NUM_CPUS as f64)
67-
as usize,
68-
)
69-
}
70-
}
71-
_ => Ok(*NUM_CPUS),
72-
}
117+
Ok(self.max_concurrency)
73118
}
74119
}

0 commit comments

Comments
 (0)