Skip to content

Commit 9370990

Browse files
committed
feat: add debug spans for decoding requests
Closes: hyperium#1759
1 parent c783652 commit 9370990

File tree

2 files changed

+62
-16
lines changed

2 files changed

+62
-16
lines changed

tonic/src/codec/compression.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,8 @@ impl CompressionEncoding {
160160
}
161161

162162
#[allow(missing_docs)]
163-
#[cfg(any(feature = "gzip", feature = "zstd"))]
164-
pub(crate) fn as_str(&self) -> &'static str {
165-
match self {
163+
pub(crate) const fn as_str(&self) -> &'static str {
164+
match *self {
166165
#[cfg(feature = "gzip")]
167166
CompressionEncoding::Gzip => "gzip",
168167
#[cfg(feature = "zstd")]
@@ -171,11 +170,11 @@ impl CompressionEncoding {
171170
}
172171

173172
#[cfg(any(feature = "gzip", feature = "zstd"))]
174-
pub(crate) fn into_header_value(self) -> http::HeaderValue {
173+
pub(crate) const fn into_header_value(self) -> http::HeaderValue {
175174
http::HeaderValue::from_static(self.as_str())
176175
}
177176

178-
pub(crate) fn encodings() -> &'static [Self] {
177+
pub(crate) const fn encodings() -> &'static [Self] {
179178
&[
180179
#[cfg(feature = "gzip")]
181180
CompressionEncoding::Gzip,

tonic/src/codec/decode.rs

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,52 @@ struct StreamingInner {
3535

3636
impl<T> Unpin for Streaming<T> {}
3737

38-
#[derive(Debug, Clone, Copy)]
38+
#[derive(Debug, Clone)]
3939
enum State {
40-
ReadHeader,
40+
ReadHeader {
41+
span: tracing::Span,
42+
},
4143
ReadBody {
44+
span: tracing::Span,
4245
compression: Option<CompressionEncoding>,
4346
len: usize,
4447
},
4548
Error,
4649
}
4750

51+
impl State {
52+
fn read_header() -> Self {
53+
let span = tracing::debug_span!(
54+
"read_header",
55+
compression = tracing::field::Empty,
56+
body.bytes = tracing::field::Empty,
57+
);
58+
Self::ReadHeader { span }
59+
}
60+
61+
fn read_body(compression: Option<CompressionEncoding>, len: usize) -> Self {
62+
let span = tracing::debug_span!(
63+
"read_body",
64+
compression = compression.map(|c| c.as_str()),
65+
compressed.bytes = compression.is_some().then_some(len),
66+
uncompressed.bytes = compression.is_none().then_some(len),
67+
);
68+
Self::ReadBody {
69+
span,
70+
compression,
71+
len,
72+
}
73+
}
74+
75+
fn span(&self) -> Option<&tracing::Span> {
76+
match self {
77+
Self::ReadHeader { span } => Some(span),
78+
Self::ReadBody { span, .. } => Some(span),
79+
Self::Error => None,
80+
}
81+
}
82+
}
83+
4884
#[derive(Debug, PartialEq, Eq)]
4985
enum Direction {
5086
Request,
@@ -124,7 +160,7 @@ impl<T> Streaming<T> {
124160
.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
125161
.map_err(|err| Status::map_error(err.into()))
126162
.boxed_unsync(),
127-
state: State::ReadHeader,
163+
state: State::read_header(),
128164
direction,
129165
buf: BytesMut::with_capacity(buffer_size),
130166
trailers: None,
@@ -141,7 +177,8 @@ impl StreamingInner {
141177
&mut self,
142178
buffer_settings: BufferSettings,
143179
) -> Result<Option<DecodeBuf<'_>>, Status> {
144-
if let State::ReadHeader = self.state {
180+
if let State::ReadHeader { span } = &self.state {
181+
let _guard = span.enter();
145182
if self.buf.remaining() < HEADER_SIZE {
146183
return Ok(None);
147184
}
@@ -150,7 +187,8 @@ impl StreamingInner {
150187
0 => None,
151188
1 => {
152189
{
153-
if self.encoding.is_some() {
190+
if let Some(ce) = self.encoding {
191+
span.record("compression", ce.as_str());
154192
self.encoding
155193
} else {
156194
// https://grpc.github.io/grpc/core/md_doc_compression.html
@@ -176,6 +214,7 @@ impl StreamingInner {
176214
};
177215

178216
let len = self.buf.get_u32() as usize;
217+
span.record("body.bytes", len);
179218
let limit = self
180219
.max_message_size
181220
.unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
@@ -190,14 +229,19 @@ impl StreamingInner {
190229
}
191230

192231
self.buf.reserve(len);
232+
drop(_guard);
193233

194-
self.state = State::ReadBody {
195-
compression: compression_encoding,
196-
len,
197-
}
234+
self.state = State::read_body(compression_encoding, len)
198235
}
199236

200-
if let State::ReadBody { len, compression } = self.state {
237+
if let State::ReadBody {
238+
len,
239+
span,
240+
compression,
241+
} = &self.state
242+
{
243+
let (len, compression) = (*len, *compression);
244+
let _guard = span.enter();
201245
// if we haven't read enough of the message then return and keep
202246
// reading
203247
if self.buf.remaining() < len || self.buf.len() < len {
@@ -227,6 +271,7 @@ impl StreamingInner {
227271
return Err(Status::new(Code::Internal, message));
228272
}
229273
let decompressed_len = self.decompress_buf.len();
274+
span.record("uncompressed.bytes", decompressed_len);
230275
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
231276
} else {
232277
DecodeBuf::new(&mut self.buf, len)
@@ -240,13 +285,15 @@ impl StreamingInner {
240285

241286
// Returns Some(()) if data was found or None if the loop in `poll_next` should break
242287
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
288+
let _guard = self.state.span().map(|s| s.enter());
243289
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
244290
Some(Ok(d)) => Some(d),
245291
Some(Err(status)) => {
246292
if self.direction == Direction::Request && status.code() == Code::Cancelled {
247293
return Poll::Ready(Ok(None));
248294
}
249295

296+
drop(_guard);
250297
let _ = std::mem::replace(&mut self.state, State::Error);
251298
debug!("decoder inner stream error: {:?}", status);
252299
return Poll::Ready(Err(status));
@@ -376,7 +423,7 @@ impl<T> Streaming<T> {
376423
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
377424
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
378425
Some(msg) => {
379-
self.inner.state = State::ReadHeader;
426+
self.inner.state = State::read_header();
380427
Ok(Some(msg))
381428
}
382429
None => Ok(None),

0 commit comments

Comments
 (0)