@@ -35,16 +35,47 @@ struct StreamingInner {
35
35
36
36
impl < T > Unpin for Streaming < T > { }
37
37
38
- #[ derive( Debug , Clone , Copy ) ]
38
+ #[ derive( Debug , Clone ) ]
39
39
enum State {
40
- ReadHeader ,
40
+ ReadHeader {
41
+ span : Option < tracing:: Span > ,
42
+ } ,
41
43
ReadBody {
44
+ span : tracing:: Span ,
42
45
compression : Option < CompressionEncoding > ,
43
46
len : usize ,
44
47
} ,
45
48
Error ,
46
49
}
47
50
51
+ impl State {
52
+ fn read_header ( ) -> Self {
53
+ Self :: ReadHeader { span : None }
54
+ }
55
+
56
+ fn read_body ( compression : Option < CompressionEncoding > , len : usize ) -> Self {
57
+ let span = tracing:: debug_span!(
58
+ "read_body" ,
59
+ compression = compression. map( |c| c. as_str( ) ) ,
60
+ compressed. bytes = compression. is_some( ) . then_some( len) ,
61
+ uncompressed. bytes = compression. is_none( ) . then_some( len) ,
62
+ ) ;
63
+ Self :: ReadBody {
64
+ span,
65
+ compression,
66
+ len,
67
+ }
68
+ }
69
+
70
+ fn span ( & self ) -> Option < & tracing:: Span > {
71
+ match self {
72
+ Self :: ReadHeader { span } => span. as_ref ( ) ,
73
+ Self :: ReadBody { span, .. } => Some ( span) ,
74
+ Self :: Error => None ,
75
+ }
76
+ }
77
+ }
78
+
48
79
#[ derive( Debug , PartialEq , Eq ) ]
49
80
enum Direction {
50
81
Request ,
@@ -124,7 +155,7 @@ impl<T> Streaming<T> {
124
155
. map_data ( |mut buf| buf. copy_to_bytes ( buf. remaining ( ) ) )
125
156
. map_err ( |err| Status :: map_error ( err. into ( ) ) )
126
157
. boxed_unsync ( ) ,
127
- state : State :: ReadHeader ,
158
+ state : State :: read_header ( ) ,
128
159
direction,
129
160
buf : BytesMut :: with_capacity ( buffer_size) ,
130
161
trailers : None ,
@@ -141,7 +172,15 @@ impl StreamingInner {
141
172
& mut self ,
142
173
buffer_settings : BufferSettings ,
143
174
) -> Result < Option < DecodeBuf < ' _ > > , Status > {
144
- if let State :: ReadHeader = self . state {
175
+ if let State :: ReadHeader { span } = & mut self . state {
176
+ let span = span. get_or_insert_with ( || {
177
+ tracing:: debug_span!(
178
+ "read_header" ,
179
+ compression = tracing:: field:: Empty ,
180
+ body. bytes = tracing:: field:: Empty ,
181
+ )
182
+ } ) ;
183
+ let _guard = span. enter ( ) ;
145
184
if self . buf . remaining ( ) < HEADER_SIZE {
146
185
return Ok ( None ) ;
147
186
}
@@ -150,7 +189,8 @@ impl StreamingInner {
150
189
0 => None ,
151
190
1 => {
152
191
{
153
- if self . encoding . is_some ( ) {
192
+ if let Some ( ce) = self . encoding {
193
+ span. record ( "compression" , ce. as_str ( ) ) ;
154
194
self . encoding
155
195
} else {
156
196
// https://grpc.github.io/grpc/core/md_doc_compression.html
@@ -176,6 +216,7 @@ impl StreamingInner {
176
216
} ;
177
217
178
218
let len = self . buf . get_u32 ( ) as usize ;
219
+ span. record ( "body.bytes" , len) ;
179
220
let limit = self
180
221
. max_message_size
181
222
. unwrap_or ( DEFAULT_MAX_RECV_MESSAGE_SIZE ) ;
@@ -190,14 +231,19 @@ impl StreamingInner {
190
231
}
191
232
192
233
self . buf . reserve ( len) ;
234
+ drop ( _guard) ;
193
235
194
- self . state = State :: ReadBody {
195
- compression : compression_encoding,
196
- len,
197
- }
236
+ self . state = State :: read_body ( compression_encoding, len)
198
237
}
199
238
200
- if let State :: ReadBody { len, compression } = self . state {
239
+ if let State :: ReadBody {
240
+ len,
241
+ span,
242
+ compression,
243
+ } = & self . state
244
+ {
245
+ let ( len, compression) = ( * len, * compression) ;
246
+ let _guard = span. enter ( ) ;
201
247
// if we haven't read enough of the message then return and keep
202
248
// reading
203
249
if self . buf . remaining ( ) < len || self . buf . len ( ) < len {
@@ -227,6 +273,7 @@ impl StreamingInner {
227
273
return Err ( Status :: new ( Code :: Internal , message) ) ;
228
274
}
229
275
let decompressed_len = self . decompress_buf . len ( ) ;
276
+ span. record ( "uncompressed.bytes" , decompressed_len) ;
230
277
DecodeBuf :: new ( & mut self . decompress_buf , decompressed_len)
231
278
} else {
232
279
DecodeBuf :: new ( & mut self . buf , len)
@@ -240,13 +287,15 @@ impl StreamingInner {
240
287
241
288
// Returns Some(()) if data was found or None if the loop in `poll_next` should break
242
289
fn poll_data ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < Option < ( ) > , Status > > {
290
+ let _guard = self . state . span ( ) . map ( |s| s. enter ( ) ) ;
243
291
let chunk = match ready ! ( Pin :: new( & mut self . body) . poll_data( cx) ) {
244
292
Some ( Ok ( d) ) => Some ( d) ,
245
293
Some ( Err ( status) ) => {
246
294
if self . direction == Direction :: Request && status. code ( ) == Code :: Cancelled {
247
295
return Poll :: Ready ( Ok ( None ) ) ;
248
296
}
249
297
298
+ drop ( _guard) ;
250
299
let _ = std:: mem:: replace ( & mut self . state , State :: Error ) ;
251
300
debug ! ( "decoder inner stream error: {:?}" , status) ;
252
301
return Poll :: Ready ( Err ( status) ) ;
@@ -376,7 +425,7 @@ impl<T> Streaming<T> {
376
425
match self . inner . decode_chunk ( self . decoder . buffer_settings ( ) ) ? {
377
426
Some ( mut decode_buf) => match self . decoder . decode ( & mut decode_buf) ? {
378
427
Some ( msg) => {
379
- self . inner . state = State :: ReadHeader ;
428
+ self . inner . state = State :: read_header ( ) ;
380
429
Ok ( Some ( msg) )
381
430
}
382
431
None => Ok ( None ) ,
0 commit comments