Skip to content

Commit

Permalink
middlewares spawn task and cancel on timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Nov 2, 2023
1 parent 6c99906 commit 2294246
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/extensions/api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ async fn rotate_endpoint_on_head_mismatch() {
}

// wait a bit to process tasks
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
tokio::time::sleep(std::time::Duration::from_millis(20)).await;

assert!(head_sub.sink.is_closed());
assert!(finalized_head_sub.sink.is_closed());
Expand Down
30 changes: 26 additions & 4 deletions src/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use futures::{future::BoxFuture, FutureExt};
use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};

pub use crate::{
config::{RpcMethod, RpcSubscription},
Expand Down Expand Up @@ -34,15 +34,20 @@ impl<Request, Result> Clone for Middlewares<Request, Result> {
}
}

impl<Request: Send + 'static, Result: 'static> Middlewares<Request, Result> {
impl<Request: Debug + Send + 'static, Result: Send + 'static> Middlewares<Request, Result> {
pub fn new(
middlewares: Vec<Arc<dyn Middleware<Request, Result>>>,
fallback: Arc<dyn Fn(Request, TypeRegistry) -> BoxFuture<'static, Result> + Send + Sync>,
) -> Self {
Self { middlewares, fallback }
}

pub async fn call(&self, request: Request) -> Result {
pub async fn call(
&self,
request: Request,
result_tx: tokio::sync::oneshot::Sender<Result>,
timeout: tokio::time::Duration,
) {
let iter = self.middlewares.iter().rev();
let fallback = self.fallback.clone();
let mut next: Box<dyn FnOnce(Request, TypeRegistry) -> BoxFuture<'static, Result> + Send + Sync> =
Expand All @@ -55,6 +60,23 @@ impl<Request: Send + 'static, Result: 'static> Middlewares<Request, Result> {
Box::new(move |request, context| async move { middleware.call(request, context, next2).await }.boxed());
}

(next)(request, TypeRegistry::new()).await
let req = format!("{:?}", request);

let mut task_handle = tokio::spawn(async move {
let result = (next)(request, TypeRegistry::new()).await;
_ = result_tx.send(result);
});

let sleep = tokio::time::sleep(timeout);

tokio::select! {
_ = sleep => {
tracing::error!("middlewares timeout: {req}");
task_handle.abort();
}
_ = &mut task_handle => {
tracing::trace!("middlewares finished: {req}");
}
}
}
}
62 changes: 31 additions & 31 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,17 @@ pub async fn start_server(config: Config) -> anyhow::Result<(SocketAddr, ServerH
parsed.as_array().ok_or_else(|| errors::invalid_params(""))?.to_owned()
};

let request = method_middlewares
.call(CallRequest::new(method_name, params))
.with_context(cx);

let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(request_timeout_seconds));

tokio::select! {
result = request => result,
_ = sleep => {
tracing::debug!("CallRequest timeout");
Ok::<_, ErrorObjectOwned>(Err(errors::map_error(jsonrpsee::core::Error::RequestTimeout))?)
}
}
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let timeout = tokio::time::Duration::from_secs(request_timeout_seconds);

method_middlewares
.call(CallRequest::new(method_name, params), result_tx, timeout)
.with_context(cx)
.await;

result_rx
.await
.map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))?
}
})?;
}
Expand Down Expand Up @@ -131,24 +129,26 @@ pub async fn start_server(config: Config) -> anyhow::Result<(SocketAddr, ServerH
parsed.as_array().ok_or_else(|| errors::invalid_params(""))?.to_owned()
};

let subscribe = subscription_middlewares
.call(SubscriptionRequest {
subscribe: subscribe_name.into(),
params,
unsubscribe: unsubscribe_name.into(),
sink,
})
.with_context(cx);

let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(request_timeout_seconds));

tokio::select! {
result = subscribe => result,
_ = sleep => {
tracing::debug!("SubscriptionRequest timeout");
Ok(Err(errors::map_error(jsonrpsee::core::Error::RequestTimeout))?)
}
}
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let timeout = tokio::time::Duration::from_secs(request_timeout_seconds);

subscription_middlewares
.call(
SubscriptionRequest {
subscribe: subscribe_name.into(),
params,
unsubscribe: unsubscribe_name.into(),
sink,
},
result_tx,
timeout,
)
.with_context(cx)
.await;

result_rx
.await
.map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))?
}
})?;
}
Expand Down

0 comments on commit 2294246

Please sign in to comment.