diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 21fae1f..9213e9e 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -379,22 +379,33 @@ impl Client { } pub async fn request(&self, method: &str, params: Vec) -> Result { - let cx = TRACER.context(method.to_string()); - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(Message::Request { - method: method.into(), - params, - response: tx, - retries: self.retries, - }) - .await - .map_err(errors::internal_error)?; + async move { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .send(Message::Request { + method: method.into(), + params, + response: tx, + retries: self.retries, + }) + .await + .map_err(errors::internal_error)?; + + let result = rx.await.map_err(errors::internal_error)?.map_err(errors::map_error); + + opentelemetry::trace::get_active_span(|span| match result.as_ref() { + Ok(_) => { + span.set_status(opentelemetry::trace::Status::Ok); + } + Err(err) => { + span.set_status(opentelemetry::trace::Status::error(err.to_string())); + } + }); - rx.with_context(cx) - .await - .map_err(errors::internal_error)? - .map_err(errors::map_error) + result + } + .with_context(TRACER.context(method.to_string())) + .await } pub async fn subscribe( @@ -403,21 +414,36 @@ impl Client { params: Vec, unsubscribe: &str, ) -> Result, Error> { - let cx = TRACER.context(subscribe.to_string()); - - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(Message::Subscribe { - subscribe: subscribe.into(), - params, - unsubscribe: unsubscribe.into(), - response: tx, - retries: self.retries, - }) - .await - .map_err(errors::failed)?; + async move { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .send(Message::Subscribe { + subscribe: subscribe.into(), + params, + unsubscribe: unsubscribe.into(), + response: tx, + retries: self.retries, + }) + .await + .map_err(errors::failed)?; + + let result = rx.await.map_err(errors::failed)?; + + opentelemetry::trace::get_active_span(|span| match result.as_ref() { + Ok(_) => { + span.set_status(opentelemetry::trace::Status::Ok); + } + Err(err) => { + span.set_status(opentelemetry::trace::Status::Error { + description: std::borrow::Cow::from(err.to_string()), + }); + } + }); - rx.with_context(cx).await.map_err(errors::failed)? + result + } + .with_context(TRACER.context(subscribe.to_string())) + .await } pub async fn rotate_endpoint(&self) { diff --git a/src/middlewares/mod.rs b/src/middlewares/mod.rs index 0f22205..8a577be 100644 --- a/src/middlewares/mod.rs +++ b/src/middlewares/mod.rs @@ -150,10 +150,16 @@ impl Middlewares { tracing::error!("middlewares timeout: {req}"); + opentelemetry::trace::get_active_span(|span| { + span.set_status(opentelemetry::trace::Status::error("middlewares timeout")); + }); task_handle.abort(); } _ = &mut task_handle => { tracing::trace!("middlewares finished: {req}"); + opentelemetry::trace::get_active_span(|span| { + span.set_status(opentelemetry::trace::Status::Ok); + }); } } } diff --git a/src/server.rs b/src/server.rs index 95e4fab..d7d743d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -73,10 +73,7 @@ pub async fn build(config: Config) -> anyhow::Result { module.register_async_method(method_name, move |params, _| { let method_middlewares = method_middlewares.clone(); - async move { - let cx = tracer.context(method_name); - let parsed = params.parse::()?; let params = if parsed == JsonValue::Null { vec![] @@ -89,13 +86,24 @@ pub async fn build(config: Config) -> anyhow::Result { method_middlewares .call(CallRequest::new(method_name, params), result_tx, timeout) - .with_context(cx) .await; - result_rx + let result = result_rx .await - .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))? + .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))?; + + opentelemetry::trace::get_active_span(|span| match result.as_ref() { + Ok(_) => { + span.set_status(opentelemetry::trace::Status::Ok); + } + Err(err) => { + span.set_status(opentelemetry::trace::Status::error(err.to_string())); + } + }); + + result } + .with_context(tracer.context(method_name)) })?; } @@ -126,10 +134,7 @@ pub async fn build(config: Config) -> anyhow::Result { unsubscribe_name, move |params, pending_sink, _| { let subscription_middlewares = subscription_middlewares.clone(); - async move { - let cx = tracer.context(name); - let parsed = params.parse::()?; let params = if parsed == JsonValue::Null { vec![] @@ -151,13 +156,24 @@ pub async fn build(config: Config) -> anyhow::Result { result_tx, timeout, ) - .with_context(cx) .await; - result_rx + let result = result_rx .await - .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))? + .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))?; + + opentelemetry::trace::get_active_span(|span| match result.as_ref() { + Ok(_) => { + span.set_status(opentelemetry::trace::Status::Ok); + } + Err(err) => { + span.set_status(opentelemetry::trace::Status::error(format!("{:?}", err))); + } + }); + + result } + .with_context(tracer.context(name)) }, )?; }