Skip to content

Commit 1b9bfc5

Browse files
add suback and unsuback test cases
1 parent fd71dfd commit 1b9bfc5

File tree

14 files changed

+322
-175
lines changed

14 files changed

+322
-175
lines changed

mqrstt/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ mod tokio_lib_test {
409409
network.connect(stream, &mut pingresp).await.unwrap();
410410

411411
let network_handle = tokio::task::spawn(async move {
412-
let result = network.run(&mut pingresp).await;
412+
let _result = network.run(&mut pingresp).await;
413413
// check result and or restart the connection
414414
pingresp
415415
});

mqrstt/src/packets/auth/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ where
4848
S: tokio::io::AsyncWrite + Unpin,
4949
{
5050
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
51-
let reason_code_writen = self.reason_code.async_write(stream).await?;
52-
let properties_writen = self.properties.async_write(stream).await?;
53-
Ok(reason_code_writen + properties_writen)
51+
let reason_code_written = self.reason_code.async_write(stream).await?;
52+
let properties_written = self.properties.async_write(stream).await?;
53+
Ok(reason_code_written + properties_written)
5454
}
5555
}
5656

mqrstt/src/packets/connack/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ where
8585
{
8686
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
8787
use crate::packets::mqtt_trait::MqttAsyncWrite;
88-
let connack_flags_writen = self.connack_flags.async_write(stream).await?;
89-
let reason_code_writen = self.reason_code.async_write(stream).await?;
90-
let connack_properties_writen = self.connack_properties.async_write(stream).await?;
88+
let connack_flags_written = self.connack_flags.async_write(stream).await?;
89+
let reason_code_written = self.reason_code.async_write(stream).await?;
90+
let connack_properties_written = self.connack_properties.async_write(stream).await?;
9191

92-
Ok(connack_flags_writen + reason_code_writen + connack_properties_writen)
92+
Ok(connack_flags_written + reason_code_written + connack_properties_written)
9393
}
9494
}
9595

mqrstt/src/packets/connect/mod.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -223,12 +223,12 @@ where
223223
use crate::packets::mqtt_trait::MqttAsyncWrite;
224224
use tokio::io::AsyncWriteExt;
225225
async move {
226-
let mut total_writen_bytes = 6 // protocol header
226+
let mut total_written_bytes = 6 // protocol header
227227
+ 1 // protocol version
228228
+ 1 // connect flags
229229
+ 2; // keep alive
230230
let protocol = [0x00, 0x04, b'M', b'Q', b'T', b'T'];
231-
// We allready start with 6 as total writen bytes thus dont add anymore
231+
// We allready start with 6 as total written bytes thus dont add anymore
232232
stream.write_all(&protocol).await?;
233233

234234
self.protocol_version.async_write(stream).await?;
@@ -250,21 +250,21 @@ where
250250

251251
stream.write_u16(self.keep_alive).await?;
252252

253-
total_writen_bytes += self.connect_properties.async_write(stream).await?;
253+
total_written_bytes += self.connect_properties.async_write(stream).await?;
254254

255-
total_writen_bytes += self.client_id.async_write(stream).await?;
255+
total_written_bytes += self.client_id.async_write(stream).await?;
256256

257257
if let Some(last_will) = &self.last_will {
258-
total_writen_bytes += last_will.async_write(stream).await?;
258+
total_written_bytes += last_will.async_write(stream).await?;
259259
}
260260
if let Some(username) = &self.username {
261-
total_writen_bytes += username.async_write(stream).await?;
261+
total_written_bytes += username.async_write(stream).await?;
262262
}
263263
if let Some(password) = &self.password {
264-
total_writen_bytes += password.async_write(stream).await?;
264+
total_written_bytes += password.async_write(stream).await?;
265265
}
266266

267-
Ok(total_writen_bytes)
267+
Ok(total_written_bytes)
268268
}
269269
}
270270
}

mqrstt/src/packets/macros/properties_macros.rs

+86-86
Large diffs are not rendered by default.

mqrstt/src/packets/mod.rs

+105-28
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,7 @@ impl Packet {
103103
Packet::PubRel(_) => 0b0110_0010,
104104
Packet::PubComp(_) => 0b0111_0000,
105105
Packet::Subscribe(_) => 0b1000_0010,
106-
Packet::SubAck(_) => {
107-
unreachable!()
108-
}
106+
Packet::SubAck(_) => 0b1001_0000,
109107
Packet::Unsubscribe(_) => 0b1010_0010,
110108
Packet::UnsubAck(_) => 0b1011_0000,
111109
Packet::PingReq => 0b1100_0000,
@@ -168,17 +166,20 @@ impl Packet {
168166
p.wire_len().write_variable_integer(buf)?;
169167
p.write(buf)?;
170168
}
171-
Packet::SubAck(_) => {
172-
unreachable!()
169+
Packet::SubAck(p) => {
170+
buf.put_u8(0b1001_0000);
171+
p.wire_len().write_variable_integer(buf)?;
172+
p.write(buf)?;
173173
}
174174
Packet::Unsubscribe(p) => {
175175
buf.put_u8(0b1010_0010);
176176
p.wire_len().write_variable_integer(buf)?;
177177
p.write(buf)?;
178178
}
179-
Packet::UnsubAck(_) => {
180-
unreachable!();
181-
// buf.put_u8(0b1011_0000);
179+
Packet::UnsubAck(p) => {
180+
buf.put_u8(0b1011_0000);
181+
p.wire_len().write_variable_integer(buf)?;
182+
p.write(buf)?;
182183
}
183184
Packet::PingReq => {
184185
buf.put_u8(0b1100_0000);
@@ -259,17 +260,20 @@ impl Packet {
259260
written += p.wire_len().write_async_variable_integer(stream).await?;
260261
written += p.async_write(stream).await?;
261262
}
262-
Packet::SubAck(_) => {
263-
unreachable!()
263+
Packet::SubAck(p) => {
264+
stream.write_u8(0b1001_0000).await?;
265+
written += p.wire_len().write_async_variable_integer(stream).await?;
266+
written += p.async_write(stream).await?;
264267
}
265268
Packet::Unsubscribe(p) => {
266269
stream.write_u8(0b1010_0010).await?;
267270
written += p.wire_len().write_async_variable_integer(stream).await?;
268271
written += p.async_write(stream).await?;
269272
}
270-
Packet::UnsubAck(_) => {
271-
unreachable!();
272-
// stream.write_u8(0b1011_0000).await?;
273+
Packet::UnsubAck(p) => {
274+
stream.write_u8(0b1011_0000).await?;
275+
written += p.wire_len().write_async_variable_integer(stream).await?;
276+
written += p.async_write(stream).await?;
273277
}
274278
Packet::PingReq => {
275279
stream.write_u8(0b1100_0000).await?;
@@ -396,6 +400,28 @@ impl Display for Packet {
396400
}
397401
}
398402

403+
impl WireLength for Packet {
404+
fn wire_len(&self) -> usize {
405+
match self {
406+
Packet::Connect(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
407+
Packet::ConnAck(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
408+
Packet::Publish(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
409+
Packet::PubAck(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
410+
Packet::PubRec(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
411+
Packet::PubRel(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
412+
Packet::PubComp(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
413+
Packet::Subscribe(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
414+
Packet::SubAck(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
415+
Packet::Unsubscribe(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
416+
Packet::UnsubAck(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
417+
Packet::PingReq => 2,
418+
Packet::PingResp => 2,
419+
Packet::Disconnect(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
420+
Packet::Auth(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
421+
}
422+
}
423+
}
424+
399425
/// 2.1.2 MQTT Control Packet type
400426
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
401427
pub enum PacketType {
@@ -447,33 +473,47 @@ impl std::fmt::Display for PacketType {
447473

448474
#[cfg(test)]
449475
mod tests {
476+
450477
use bytes::BytesMut;
451478

452479
use crate::packets::Packet;
453480

454481
use crate::tests::test_packets::*;
455482

456483
#[rstest::rstest]
457-
#[case(ping_req_case().1)]
458-
#[case(ping_resp_case().1)]
459-
#[case(connack_case().1)]
460-
#[case(create_subscribe_packet(1))]
461-
#[case(create_subscribe_packet(65335))]
462-
#[case(create_puback_packet(1))]
463-
#[case(create_puback_packet(65335))]
464-
#[case(create_disconnect_packet())]
465-
#[case(create_connack_packet(true))]
466-
#[case(create_connack_packet(false))]
467-
#[case(publish_packet_1())]
468-
#[case(publish_packet_2())]
469-
#[case(publish_packet_3())]
470-
#[case(publish_packet_4())]
471-
#[case(create_empty_publish_packet())]
484+
#[case::ping_req_case(ping_req_case().1)]
485+
#[case::ping_resp_case(ping_resp_case().1)]
486+
#[case::connack_case(connack_case().1)]
487+
#[case::create_subscribe_packet(create_subscribe_packet(1))]
488+
#[case::create_subscribe_packet(create_subscribe_packet(65335))]
489+
#[case::create_puback_packet(create_puback_packet(1))]
490+
#[case::create_puback_packet(create_puback_packet(65335))]
491+
#[case::create_disconnect_packet(create_disconnect_packet())]
492+
#[case::create_connack_packet(create_connack_packet(true))]
493+
#[case::create_connack_packet(create_connack_packet(false))]
494+
#[case::publish_packet_1(publish_packet_1())]
495+
#[case::publish_packet_2(publish_packet_2())]
496+
#[case::publish_packet_3(publish_packet_3())]
497+
#[case::publish_packet_4(publish_packet_4())]
498+
#[case::create_empty_publish_packet(create_empty_publish_packet())]
499+
#[case::subscribe(subscribe_case())]
500+
#[case::suback(suback_case())]
501+
#[case::unsubscribe(unsubscribe_case())]
502+
#[case::unsuback(unsuback_case())]
472503
fn test_write_read_write_read_cases(#[case] packet: Packet) {
504+
use crate::packets::WireLength;
505+
473506
let mut buffer = BytesMut::new();
507+
474508
packet.write(&mut buffer).unwrap();
509+
510+
let wire_len = packet.wire_len();
511+
assert_eq!(wire_len, buffer.len());
512+
475513
let res1 = Packet::read(&mut buffer).unwrap();
476514

515+
assert_eq!(packet, res1);
516+
477517
let mut buffer = BytesMut::new();
478518
res1.write(&mut buffer).unwrap();
479519
let res2 = Packet::read(&mut buffer).unwrap();
@@ -533,6 +573,43 @@ mod tests {
533573
assert_eq!(out, input)
534574
}
535575

576+
#[rstest::rstest]
577+
#[case::ping_req_case(ping_req_case().1)]
578+
#[case::ping_resp_case(ping_resp_case().1)]
579+
#[case::connack_case(connack_case().1)]
580+
#[case::create_subscribe_packet(create_subscribe_packet(1))]
581+
#[case::create_subscribe_packet(create_subscribe_packet(65335))]
582+
#[case::create_puback_packet(create_puback_packet(1))]
583+
#[case::create_puback_packet(create_puback_packet(65335))]
584+
#[case::create_disconnect_packet(create_disconnect_packet())]
585+
#[case::create_connack_packet(create_connack_packet(true))]
586+
#[case::create_connack_packet(create_connack_packet(false))]
587+
#[case::publish_packet_1(publish_packet_1())]
588+
#[case::publish_packet_2(publish_packet_2())]
589+
#[case::publish_packet_3(publish_packet_3())]
590+
#[case::publish_packet_4(publish_packet_4())]
591+
#[case::create_empty_publish_packet(create_empty_publish_packet())]
592+
#[case::subscribe(subscribe_case())]
593+
#[case::suback(suback_case())]
594+
#[case::unsubscribe(unsubscribe_case())]
595+
#[case::unsuback(unsuback_case())]
596+
#[tokio::test]
597+
async fn test_async_write_read_write_read_cases(#[case] packet: Packet) {
598+
use crate::packets::WireLength;
599+
600+
let mut buffer = Vec::with_capacity(1000);
601+
packet.async_write(&mut buffer).await.unwrap();
602+
603+
let wire_len = packet.wire_len();
604+
assert_eq!(wire_len, buffer.len());
605+
606+
let mut buf = buffer.as_slice();
607+
608+
let res1 = Packet::async_read(&mut buf).await.unwrap();
609+
610+
assert_eq!(packet, res1);
611+
}
612+
536613
// #[rstest::rstest]
537614
// #[case(&[59, 1, 0, 59])]
538615
// #[case(&[16, 14, 0, 4, 77, 81, 84, 84, 5, 247, 247, 252, 1, 17, 247, 247, 247])]

mqrstt/src/packets/pubcomp/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -126,18 +126,18 @@ where
126126
{
127127
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
128128
use crate::packets::mqtt_trait::MqttAsyncWrite;
129-
let mut total_writen_bytes = 2;
129+
let mut total_written_bytes = 2;
130130
self.packet_identifier.async_write(stream).await?;
131131

132132
if self.reason_code == PubCompReasonCode::Success && self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
133-
return Ok(total_writen_bytes);
133+
return Ok(total_written_bytes);
134134
} else if self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
135-
total_writen_bytes += self.reason_code.async_write(stream).await?;
135+
total_written_bytes += self.reason_code.async_write(stream).await?;
136136
} else {
137-
total_writen_bytes += self.reason_code.async_write(stream).await?;
138-
total_writen_bytes += self.properties.async_write(stream).await?;
137+
total_written_bytes += self.reason_code.async_write(stream).await?;
138+
total_written_bytes += self.properties.async_write(stream).await?;
139139
}
140-
Ok(total_writen_bytes)
140+
Ok(total_written_bytes)
141141
}
142142
}
143143

mqrstt/src/packets/pubrec/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,18 @@ where
118118
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
119119
use crate::packets::mqtt_trait::MqttAsyncWrite;
120120
async move {
121-
let mut total_writen_bytes = 2;
121+
let mut total_written_bytes = 2;
122122
self.packet_identifier.async_write(stream).await?;
123123

124124
if self.reason_code == PubRecReasonCode::Success && self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
125-
return Ok(total_writen_bytes);
125+
return Ok(total_written_bytes);
126126
} else if self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
127-
total_writen_bytes += self.reason_code.async_write(stream).await?;
127+
total_written_bytes += self.reason_code.async_write(stream).await?;
128128
} else {
129-
total_writen_bytes += self.reason_code.async_write(stream).await?;
130-
total_writen_bytes += self.properties.async_write(stream).await?;
129+
total_written_bytes += self.reason_code.async_write(stream).await?;
130+
total_written_bytes += self.properties.async_write(stream).await?;
131131
}
132-
Ok(total_writen_bytes)
132+
Ok(total_written_bytes)
133133
}
134134
}
135135
}

mqrstt/src/packets/pubrel/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,18 @@ where
116116
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
117117
use crate::packets::mqtt_trait::MqttAsyncWrite;
118118
async move {
119-
let mut total_writen_bytes = 2;
119+
let mut total_written_bytes = 2;
120120
self.packet_identifier.async_write(stream).await?;
121121

122122
if self.reason_code == PubRelReasonCode::Success && self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
123-
return Ok(total_writen_bytes);
123+
return Ok(total_written_bytes);
124124
} else if self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
125-
total_writen_bytes += self.reason_code.async_write(stream).await?;
125+
total_written_bytes += self.reason_code.async_write(stream).await?;
126126
} else {
127-
total_writen_bytes += self.reason_code.async_write(stream).await?;
128-
total_writen_bytes += self.properties.async_write(stream).await?;
127+
total_written_bytes += self.reason_code.async_write(stream).await?;
128+
total_written_bytes += self.properties.async_write(stream).await?;
129129
}
130-
Ok(total_writen_bytes)
130+
Ok(total_written_bytes)
131131
}
132132
}
133133
}

0 commit comments

Comments
 (0)