Skip to content

Commit

Permalink
Use actual runtime instead of a single thread
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Dec 14, 2024
1 parent 1f02e53 commit c3a74d5
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 31 deletions.
36 changes: 20 additions & 16 deletions ipa-core/src/query/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,26 @@ pub fn execute<R: PrivateKeyRegistry>(
)
},
),
(QueryType::MaliciousHybrid(ipa_config), _) => do_query(
runtime,
config,
gateway,
input,
move |prss, gateway, config, input| {
Box::pin(execute_hybrid_protocol(
prss,
gateway,
input,
ipa_config,
config,
key_registry,
))
},
),
(QueryType::MaliciousHybrid(ipa_config), _) => {
let protocol_runtime = runtime.clone();
do_query(
runtime,
config,
gateway,
input,
move |prss, gateway, config, input| {
Box::pin(execute_hybrid_protocol(
prss,
gateway,
input,
ipa_config,
config,
key_registry,
protocol_runtime,
))
},
)
}
}
}

Expand Down
35 changes: 20 additions & 15 deletions ipa-core/src/query/runner/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use generic_array::ArrayLength;
use super::QueryResult;
use crate::{
error::{Error, LengthError},
executor::IpaRuntime,
ff::{
boolean::Boolean,
boolean_array::{BooleanArray, BA3, BA32, BA8},
Expand Down Expand Up @@ -54,15 +55,17 @@ use crate::{
pub struct Query<C, HV, R: PrivateKeyRegistry> {
config: HybridQueryParams,
key_registry: Arc<R>,
ipa_runtime: IpaRuntime,
phantom_data: PhantomData<(C, HV)>,
}

#[allow(dead_code)]
impl<C, HV, R: PrivateKeyRegistry> Query<C, HV, R> {
pub fn new(query_params: HybridQueryParams, key_registry: Arc<R>) -> Self {
pub fn new(query_params: HybridQueryParams, key_registry: Arc<R>, runtime: IpaRuntime) -> Self {
Self {
config: query_params,
key_registry,
ipa_runtime: runtime,
phantom_data: PhantomData,
}
}
Expand Down Expand Up @@ -104,10 +107,11 @@ where
let Self {
config,
key_registry,
ipa_runtime,
phantom_data: _,
} = self;

tracing::info!("New hybrid query: {config:?}");
tracing::info!("New hybrid query: {config:?} with parallel decryption");
let ctx = ctx.narrow(&Hybrid);
let sz = usize::from(query_size);

Expand All @@ -123,21 +127,21 @@ where
.try_flatten()
.map(|enc_report| {
let key_registry = Arc::clone(&key_registry);
let ipa_runtime = ipa_runtime.clone();
async move {
match enc_report {
Ok(enc_report) => {
let r = tokio::spawn({
async move {
let dec_report = enc_report
.decrypt(key_registry.as_ref())
.map_err(Into::<Error>::into);
let unique_tag = UniqueTag::from_unique_bytes(&enc_report);
dec_report.map(|dec_report1| (dec_report1, unique_tag))
}
})
.await
.map_err(Into::<Error>::into)?;
r
ipa_runtime
.spawn({
async move {
let dec_report = enc_report
.decrypt(key_registry.as_ref())
.map_err(Into::<Error>::into);
let unique_tag = UniqueTag::from_unique_bytes(&enc_report);
dec_report.map(|dec_report1| (dec_report1, unique_tag))
}
})
.await
}
Err(e) => Err(e),
}
Expand Down Expand Up @@ -187,6 +191,7 @@ pub async fn execute_hybrid_protocol<'a, R: PrivateKeyRegistry>(
ipa_config: HybridQueryParams,
config: &QueryConfig,
key_registry: Arc<R>,
ipa_runtime: IpaRuntime,
) -> QueryResult {
let gate = Gate::default();
let cross_shard_prss =
Expand All @@ -200,7 +205,7 @@ pub async fn execute_hybrid_protocol<'a, R: PrivateKeyRegistry>(
let ctx = ShardedMaliciousContext::new_with_gate(prss, gateway, gate, sharded);

Ok(Box::new(
Query::<_, BA32, R>::new(ipa_config, key_registry)
Query::<_, BA32, R>::new(ipa_config, key_registry, ipa_runtime)
.execute(ctx, config.size, input)
.await?,
))
Expand Down

0 comments on commit c3a74d5

Please sign in to comment.