Skip to content

Commit

Permalink
Fix issues with streams stalling (#65)
Browse files Browse the repository at this point in the history
* add option for tracing

* bump max request/response size

* use while let

* fix unyielding streams

* introduce max_rpc_payload_size

---------

Co-authored-by: Seun Lanlege <seun@polytope.technology>
  • Loading branch information
Wizdave97 and seunlanlege authored Mar 7, 2024
1 parent ff95ffb commit 1c90b8a
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 106 deletions.
167 changes: 164 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ macro_rules! chain {
async fn state_machine_update_notification(
&self,
counterparty_state_id: ismp::consensus::StateMachineId,
) -> Result<primitives::BoxStream<primitives::StateMachineUpdated>, anyhow::Error> {
) -> Result<primitives::BoxStream<primitives::StreamItem>, anyhow::Error> {
match self {
$(
$(#[$($meta)*])*
Expand Down
16 changes: 11 additions & 5 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,30 @@ where
let task_a = {
let chain_a = chain_a.clone();
let chain_b = chain_b.clone();
Box::pin(handle_notification(chain_a, chain_b))
tokio::spawn(async move {
let _ = handle_notification(chain_a, chain_b).await?;
Ok::<_, anyhow::Error>(())
})
};

let task_b = {
let chain_a = chain_a.clone();
let chain_b = chain_b.clone();
Box::pin(handle_notification(chain_b, chain_a))
tokio::spawn(async move {
let _ = handle_notification(chain_b, chain_a).await?;
Ok::<_, anyhow::Error>(())
})
};

// if one task completes, abort the other
tokio::select! {
result_a = task_a => {
result_a?
result_a??
}
result_b = task_b => {
result_b?
result_b??
}
};
}

Ok(())
}
Expand Down
Loading

0 comments on commit 1c90b8a

Please sign in to comment.