@@ -103,9 +103,7 @@ impl Packet {
103
103
Packet :: PubRel ( _) => 0b0110_0010 ,
104
104
Packet :: PubComp ( _) => 0b0111_0000 ,
105
105
Packet :: Subscribe ( _) => 0b1000_0010 ,
106
- Packet :: SubAck ( _) => {
107
- unreachable ! ( )
108
- }
106
+ Packet :: SubAck ( _) => 0b1001_0000 ,
109
107
Packet :: Unsubscribe ( _) => 0b1010_0010 ,
110
108
Packet :: UnsubAck ( _) => 0b1011_0000 ,
111
109
Packet :: PingReq => 0b1100_0000 ,
@@ -168,17 +166,20 @@ impl Packet {
168
166
p. wire_len ( ) . write_variable_integer ( buf) ?;
169
167
p. write ( buf) ?;
170
168
}
171
- Packet :: SubAck ( _) => {
172
- unreachable ! ( )
169
+ Packet :: SubAck ( p) => {
170
+ buf. put_u8 ( 0b1001_0000 ) ;
171
+ p. wire_len ( ) . write_variable_integer ( buf) ?;
172
+ p. write ( buf) ?;
173
173
}
174
174
Packet :: Unsubscribe ( p) => {
175
175
buf. put_u8 ( 0b1010_0010 ) ;
176
176
p. wire_len ( ) . write_variable_integer ( buf) ?;
177
177
p. write ( buf) ?;
178
178
}
179
- Packet :: UnsubAck ( _) => {
180
- unreachable ! ( ) ;
181
- // buf.put_u8(0b1011_0000);
179
+ Packet :: UnsubAck ( p) => {
180
+ buf. put_u8 ( 0b1011_0000 ) ;
181
+ p. wire_len ( ) . write_variable_integer ( buf) ?;
182
+ p. write ( buf) ?;
182
183
}
183
184
Packet :: PingReq => {
184
185
buf. put_u8 ( 0b1100_0000 ) ;
@@ -259,17 +260,20 @@ impl Packet {
259
260
written += p. wire_len ( ) . write_async_variable_integer ( stream) . await ?;
260
261
written += p. async_write ( stream) . await ?;
261
262
}
262
- Packet :: SubAck ( _) => {
263
- unreachable ! ( )
263
+ Packet :: SubAck ( p) => {
264
+ stream. write_u8 ( 0b1001_0000 ) . await ?;
265
+ written += p. wire_len ( ) . write_async_variable_integer ( stream) . await ?;
266
+ written += p. async_write ( stream) . await ?;
264
267
}
265
268
Packet :: Unsubscribe ( p) => {
266
269
stream. write_u8 ( 0b1010_0010 ) . await ?;
267
270
written += p. wire_len ( ) . write_async_variable_integer ( stream) . await ?;
268
271
written += p. async_write ( stream) . await ?;
269
272
}
270
- Packet :: UnsubAck ( _) => {
271
- unreachable ! ( ) ;
272
- // stream.write_u8(0b1011_0000).await?;
273
+ Packet :: UnsubAck ( p) => {
274
+ stream. write_u8 ( 0b1011_0000 ) . await ?;
275
+ written += p. wire_len ( ) . write_async_variable_integer ( stream) . await ?;
276
+ written += p. async_write ( stream) . await ?;
273
277
}
274
278
Packet :: PingReq => {
275
279
stream. write_u8 ( 0b1100_0000 ) . await ?;
@@ -396,6 +400,28 @@ impl Display for Packet {
396
400
}
397
401
}
398
402
403
+ impl WireLength for Packet {
404
+ fn wire_len ( & self ) -> usize {
405
+ match self {
406
+ Packet :: Connect ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
407
+ Packet :: ConnAck ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
408
+ Packet :: Publish ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
409
+ Packet :: PubAck ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
410
+ Packet :: PubRec ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
411
+ Packet :: PubRel ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
412
+ Packet :: PubComp ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
413
+ Packet :: Subscribe ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
414
+ Packet :: SubAck ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
415
+ Packet :: Unsubscribe ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
416
+ Packet :: UnsubAck ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
417
+ Packet :: PingReq => 2 ,
418
+ Packet :: PingResp => 2 ,
419
+ Packet :: Disconnect ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
420
+ Packet :: Auth ( p) => 1 + p. wire_len ( ) . variable_integer_len ( ) + p. wire_len ( ) ,
421
+ }
422
+ }
423
+ }
424
+
399
425
/// 2.1.2 MQTT Control Packet type
400
426
#[ derive( Debug , Clone , Copy , PartialEq , Eq , PartialOrd ) ]
401
427
pub enum PacketType {
@@ -447,33 +473,47 @@ impl std::fmt::Display for PacketType {
447
473
448
474
#[ cfg( test) ]
449
475
mod tests {
476
+
450
477
use bytes:: BytesMut ;
451
478
452
479
use crate :: packets:: Packet ;
453
480
454
481
use crate :: tests:: test_packets:: * ;
455
482
456
483
#[ rstest:: rstest]
457
- #[ case( ping_req_case( ) . 1 ) ]
458
- #[ case( ping_resp_case( ) . 1 ) ]
459
- #[ case( connack_case( ) . 1 ) ]
460
- #[ case( create_subscribe_packet( 1 ) ) ]
461
- #[ case( create_subscribe_packet( 65335 ) ) ]
462
- #[ case( create_puback_packet( 1 ) ) ]
463
- #[ case( create_puback_packet( 65335 ) ) ]
464
- #[ case( create_disconnect_packet( ) ) ]
465
- #[ case( create_connack_packet( true ) ) ]
466
- #[ case( create_connack_packet( false ) ) ]
467
- #[ case( publish_packet_1( ) ) ]
468
- #[ case( publish_packet_2( ) ) ]
469
- #[ case( publish_packet_3( ) ) ]
470
- #[ case( publish_packet_4( ) ) ]
471
- #[ case( create_empty_publish_packet( ) ) ]
484
+ #[ case:: ping_req_case( ping_req_case( ) . 1 ) ]
485
+ #[ case:: ping_resp_case( ping_resp_case( ) . 1 ) ]
486
+ #[ case:: connack_case( connack_case( ) . 1 ) ]
487
+ #[ case:: create_subscribe_packet( create_subscribe_packet( 1 ) ) ]
488
+ #[ case:: create_subscribe_packet( create_subscribe_packet( 65335 ) ) ]
489
+ #[ case:: create_puback_packet( create_puback_packet( 1 ) ) ]
490
+ #[ case:: create_puback_packet( create_puback_packet( 65335 ) ) ]
491
+ #[ case:: create_disconnect_packet( create_disconnect_packet( ) ) ]
492
+ #[ case:: create_connack_packet( create_connack_packet( true ) ) ]
493
+ #[ case:: create_connack_packet( create_connack_packet( false ) ) ]
494
+ #[ case:: publish_packet_1( publish_packet_1( ) ) ]
495
+ #[ case:: publish_packet_2( publish_packet_2( ) ) ]
496
+ #[ case:: publish_packet_3( publish_packet_3( ) ) ]
497
+ #[ case:: publish_packet_4( publish_packet_4( ) ) ]
498
+ #[ case:: create_empty_publish_packet( create_empty_publish_packet( ) ) ]
499
+ #[ case:: subscribe( subscribe_case( ) ) ]
500
+ #[ case:: suback( suback_case( ) ) ]
501
+ #[ case:: unsubscribe( unsubscribe_case( ) ) ]
502
+ #[ case:: unsuback( unsuback_case( ) ) ]
472
503
fn test_write_read_write_read_cases ( #[ case] packet : Packet ) {
504
+ use crate :: packets:: WireLength ;
505
+
473
506
let mut buffer = BytesMut :: new ( ) ;
507
+
474
508
packet. write ( & mut buffer) . unwrap ( ) ;
509
+
510
+ let wire_len = packet. wire_len ( ) ;
511
+ assert_eq ! ( wire_len, buffer. len( ) ) ;
512
+
475
513
let res1 = Packet :: read ( & mut buffer) . unwrap ( ) ;
476
514
515
+ assert_eq ! ( packet, res1) ;
516
+
477
517
let mut buffer = BytesMut :: new ( ) ;
478
518
res1. write ( & mut buffer) . unwrap ( ) ;
479
519
let res2 = Packet :: read ( & mut buffer) . unwrap ( ) ;
@@ -533,6 +573,43 @@ mod tests {
533
573
assert_eq ! ( out, input)
534
574
}
535
575
576
+ #[ rstest:: rstest]
577
+ #[ case:: ping_req_case( ping_req_case( ) . 1 ) ]
578
+ #[ case:: ping_resp_case( ping_resp_case( ) . 1 ) ]
579
+ #[ case:: connack_case( connack_case( ) . 1 ) ]
580
+ #[ case:: create_subscribe_packet( create_subscribe_packet( 1 ) ) ]
581
+ #[ case:: create_subscribe_packet( create_subscribe_packet( 65335 ) ) ]
582
+ #[ case:: create_puback_packet( create_puback_packet( 1 ) ) ]
583
+ #[ case:: create_puback_packet( create_puback_packet( 65335 ) ) ]
584
+ #[ case:: create_disconnect_packet( create_disconnect_packet( ) ) ]
585
+ #[ case:: create_connack_packet( create_connack_packet( true ) ) ]
586
+ #[ case:: create_connack_packet( create_connack_packet( false ) ) ]
587
+ #[ case:: publish_packet_1( publish_packet_1( ) ) ]
588
+ #[ case:: publish_packet_2( publish_packet_2( ) ) ]
589
+ #[ case:: publish_packet_3( publish_packet_3( ) ) ]
590
+ #[ case:: publish_packet_4( publish_packet_4( ) ) ]
591
+ #[ case:: create_empty_publish_packet( create_empty_publish_packet( ) ) ]
592
+ #[ case:: subscribe( subscribe_case( ) ) ]
593
+ #[ case:: suback( suback_case( ) ) ]
594
+ #[ case:: unsubscribe( unsubscribe_case( ) ) ]
595
+ #[ case:: unsuback( unsuback_case( ) ) ]
596
+ #[ tokio:: test]
597
+ async fn test_async_write_read_write_read_cases ( #[ case] packet : Packet ) {
598
+ use crate :: packets:: WireLength ;
599
+
600
+ let mut buffer = Vec :: with_capacity ( 1000 ) ;
601
+ packet. async_write ( & mut buffer) . await . unwrap ( ) ;
602
+
603
+ let wire_len = packet. wire_len ( ) ;
604
+ assert_eq ! ( wire_len, buffer. len( ) ) ;
605
+
606
+ let mut buf = buffer. as_slice ( ) ;
607
+
608
+ let res1 = Packet :: async_read ( & mut buf) . await . unwrap ( ) ;
609
+
610
+ assert_eq ! ( packet, res1) ;
611
+ }
612
+
536
613
// #[rstest::rstest]
537
614
// #[case(&[59, 1, 0, 59])]
538
615
// #[case(&[16, 14, 0, 4, 77, 81, 84, 84, 5, 247, 247, 252, 1, 17, 247, 247, 247])]
0 commit comments