Skip to content

Commit d31fa42

Browse files
committed
feat: add debug spans for decoding requests
Closes: hyperium#1759
1 parent d312dcc commit d31fa42

File tree

1 file changed

+60
-13
lines changed

1 file changed

+60
-13
lines changed

tonic/src/codec/decode.rs

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,48 @@ impl<T> Unpin for Streaming<T> {}
3838

3939
#[derive(Debug, Clone)]
4040
enum State {
41-
ReadHeader,
41+
ReadHeader {
42+
span: tracing::Span,
43+
},
4244
ReadBody {
45+
span: tracing::Span,
4346
compression: Option<CompressionEncoding>,
4447
len: usize,
4548
},
46-
Error(Status),
49+
Error(Box<Status>),
50+
}
51+
52+
impl State {
53+
fn read_header() -> Self {
54+
let span = tracing::debug_span!(
55+
"read_header",
56+
compression = tracing::field::Empty,
57+
body.bytes = tracing::field::Empty,
58+
);
59+
Self::ReadHeader { span }
60+
}
61+
62+
fn read_body(compression: Option<CompressionEncoding>, len: usize) -> Self {
63+
let span = tracing::debug_span!(
64+
"read_body",
65+
compression = compression.map(|c| c.as_str()),
66+
compressed.bytes = len,
67+
uncompressed.bytes = compression.is_none().then_some(len),
68+
);
69+
Self::ReadBody {
70+
span,
71+
compression,
72+
len,
73+
}
74+
}
75+
76+
fn span(&self) -> Option<&tracing::Span> {
77+
match self {
78+
Self::ReadHeader { span } => Some(span),
79+
Self::ReadBody { span, .. } => Some(span),
80+
Self::Error(_) => None,
81+
}
82+
}
4783
}
4884

4985
#[derive(Debug, PartialEq, Eq)]
@@ -125,7 +161,7 @@ impl<T> Streaming<T> {
125161
.map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())))
126162
.map_err(|err| Status::map_error(err.into()))
127163
.boxed_unsync(),
128-
state: State::ReadHeader,
164+
state: State::read_header(),
129165
direction,
130166
buf: BytesMut::with_capacity(buffer_size),
131167
trailers: None,
@@ -142,7 +178,8 @@ impl StreamingInner {
142178
&mut self,
143179
buffer_settings: BufferSettings,
144180
) -> Result<Option<DecodeBuf<'_>>, Status> {
145-
if let State::ReadHeader = self.state {
181+
if let State::ReadHeader { span } = &self.state {
182+
let _guard = span.enter();
146183
if self.buf.remaining() < HEADER_SIZE {
147184
return Ok(None);
148185
}
@@ -151,7 +188,8 @@ impl StreamingInner {
151188
0 => None,
152189
1 => {
153190
{
154-
if self.encoding.is_some() {
191+
if let Some(ce) = self.encoding {
192+
span.record("compression", ce.as_str());
155193
self.encoding
156194
} else {
157195
// https://grpc.github.io/grpc/core/md_doc_compression.html
@@ -177,6 +215,7 @@ impl StreamingInner {
177215
};
178216

179217
let len = self.buf.get_u32() as usize;
218+
span.record("body.bytes", len);
180219
let limit = self
181220
.max_message_size
182221
.unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
@@ -191,14 +230,19 @@ impl StreamingInner {
191230
}
192231

193232
self.buf.reserve(len);
233+
drop(_guard);
194234

195-
self.state = State::ReadBody {
196-
compression: compression_encoding,
197-
len,
198-
}
235+
self.state = State::read_body(compression_encoding, len)
199236
}
200237

201-
if let State::ReadBody { len, compression } = self.state {
238+
if let State::ReadBody {
239+
len,
240+
span,
241+
compression,
242+
} = &self.state
243+
{
244+
let (len, compression) = (*len, *compression);
245+
let _guard = span.enter();
202246
// if we haven't read enough of the message then return and keep
203247
// reading
204248
if self.buf.remaining() < len || self.buf.len() < len {
@@ -228,6 +272,7 @@ impl StreamingInner {
228272
return Err(Status::new(Code::Internal, message));
229273
}
230274
let decompressed_len = self.decompress_buf.len();
275+
span.record("uncompressed.bytes", decompressed_len);
231276
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
232277
} else {
233278
DecodeBuf::new(&mut self.buf, len)
@@ -241,14 +286,16 @@ impl StreamingInner {
241286

242287
// Returns Some(()) if data was found or None if the loop in `poll_next` should break
243288
fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
289+
let _guard = self.state.span().map(|s| s.enter());
244290
let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) {
245291
Some(Ok(d)) => Some(d),
246292
Some(Err(status)) => {
247293
if self.direction == Direction::Request && status.code() == Code::Cancelled {
248294
return Poll::Ready(Ok(None));
249295
}
250296

251-
let _ = std::mem::replace(&mut self.state, State::Error(status.clone()));
297+
drop(_guard);
298+
let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone())));
252299
debug!("decoder inner stream error: {:?}", status);
253300
return Poll::Ready(Err(status));
254301
}
@@ -378,7 +425,7 @@ impl<T> Streaming<T> {
378425
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
379426
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
380427
Some(msg) => {
381-
self.inner.state = State::ReadHeader;
428+
self.inner.state = State::read_header();
382429
Ok(Some(msg))
383430
}
384431
None => Ok(None),
@@ -394,7 +441,7 @@ impl<T> Stream for Streaming<T> {
394441
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
395442
loop {
396443
if let State::Error(status) = &self.inner.state {
397-
return Poll::Ready(Some(Err(status.clone())));
444+
return Poll::Ready(Some(Err(*status.clone())));
398445
}
399446

400447
if let Some(item) = self.decode_chunk()? {

0 commit comments

Comments
 (0)