Skip to content

Commit 1040da6

Browse files
linter
1 parent 4fc4924 commit 1040da6

File tree

27 files changed

+463
-563
lines changed

27 files changed

+463
-563
lines changed

.github/workflows/rust.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
# run clippy to verify we have no warnings
3535
- run: cargo fetch
3636
- name: cargo clippy
37-
run: cargo clippy --all-targets --all-features -- -D warnings
37+
run: cargo clippy -p mqrstt
3838

3939
test:
4040
name: Test

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub struct PingPong {
5757
}
5858
impl AsyncEventHandler for PingPong {
5959
// Handlers only get INCOMING packets. This can change later.
60-
async fn handle(&mut self, event: packets::Packet) -> () {
60+
async fn handle(&mut self, event: packets::Packet {
6161
match event {
6262
Packet::Publish(p) => {
6363
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
@@ -132,7 +132,7 @@ pub struct PingPong {
132132
}
133133
impl AsyncEventHandler for PingPong {
134134
// Handlers only get INCOMING packets. This can change later.
135-
async fn handle(&mut self, event: packets::Packet) -> () {
135+
async fn handle(&mut self, event: packets::Packet) {
136136
match event {
137137
Packet::Publish(p) => {
138138
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
@@ -212,7 +212,7 @@ pub struct PingPong {
212212

213213
impl EventHandler for PingPong {
214214
// Handlers only get INCOMING packets. This can change later.
215-
fn handle(&mut self, event: packets::Packet) -> () {
215+
fn handle(&mut self, event: packets::Packet) {
216216
match event {
217217
Packet::Publish(p) => {
218218
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {

mqrstt/src/error.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ pub enum HandlerError {
6969
#[error("The incoming channel between network and handler is closed")]
7070
IncomingNetworkChannelClosed,
7171

72-
#[error("The outgoing channel between handler and network is closed: {0}")]
73-
OutgoingNetworkChannelClosed(#[from] SendError<Packet>),
72+
#[error("The outgoing channel between handler and network is closed")]
73+
OutgoingNetworkChannelClosed,
7474

7575
#[error("Channel between client and handler closed")]
7676
ClientChannelClosed,
@@ -88,6 +88,12 @@ pub enum HandlerError {
8888
UnexpectedPacket(PacketType),
8989
}
9090

91+
impl From<SendError<Packet>> for HandlerError {
92+
fn from(_: SendError<Packet>) -> Self {
93+
HandlerError::OutgoingNetworkChannelClosed
94+
}
95+
}
96+
9197
/// Errors producable by the [`crate::MqttClient`]
9298
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
9399
pub enum ClientError {

mqrstt/src/event_handlers.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ pub trait AsyncEventHandler {
1313

1414
/// This is a simple no operation handler.
1515
impl AsyncEventHandler for () {
16-
fn handle(&mut self, _: Packet) -> impl Future<Output = ()> + Send + Sync {
17-
async {}
18-
}
16+
async fn handle(&mut self, _: Packet) {}
1917
}
2018

2119
pub trait EventHandler {
@@ -60,7 +58,7 @@ pub mod example_handlers {
6058
}
6159

6260
impl AsyncEventHandler for PingResp {
63-
async fn handle(&mut self, event: packets::Packet) -> () {
61+
async fn handle(&mut self, event: packets::Packet) {
6462
use Packet::*;
6563
if event == PingResp {
6664
self.ping_resp_received += 1;
@@ -91,7 +89,7 @@ pub mod example_handlers {
9189
}
9290

9391
impl AsyncEventHandler for PingPong {
94-
async fn handle(&mut self, event: packets::Packet) -> () {
92+
async fn handle(&mut self, event: packets::Packet) {
9593
match event {
9694
Packet::Publish(p) => {
9795
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {

mqrstt/src/lib.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ pub mod smol;
123123
/// Contains the reader and writer parts for the tokio runtime.
124124
///
125125
/// Module [`crate::tokio`] contains both a synchronized and concurrent approach to call the users `Handler`.
126-
#[cfg(any(feature = "tokio"))]
126+
#[cfg(feature = "tokio")]
127127
pub mod tokio;
128128

129129
/// Error types that the user can see during operation of the client.
@@ -342,7 +342,7 @@ mod smol_lib_test {
342342
});
343343
}
344344

345-
#[cfg(all(target_family = "windows"))]
345+
#[cfg(target_family = "windows")]
346346
#[test]
347347
fn test_close_write_tcp_stream_smol() {
348348
use crate::error::ConnectionError;
@@ -409,7 +409,8 @@ mod tokio_lib_test {
409409
network.connect(stream, &mut pingresp).await.unwrap();
410410

411411
let network_handle = tokio::task::spawn(async move {
412-
network.run(&mut pingresp).await;
412+
let result = network.run(&mut pingresp).await;
413+
// check result and or restart the connection
413414
pingresp
414415
});
415416

mqrstt/src/packets/auth/mod.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
mod properties;
2-
use std::future::Future;
32

43
pub use properties::AuthProperties;
54
mod reason_code;
@@ -48,12 +47,10 @@ impl<S> crate::packets::mqtt_trait::PacketAsyncWrite<S> for Auth
4847
where
4948
S: tokio::io::AsyncWrite + Unpin,
5049
{
51-
fn async_write(&self, stream: &mut S) -> impl Future<Output = Result<usize, crate::packets::error::WriteError>> {
52-
async move {
53-
let reason_code_writen = self.reason_code.async_write(stream).await?;
54-
let properties_writen = self.properties.async_write(stream).await?;
55-
Ok(reason_code_writen + properties_writen)
56-
}
50+
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
51+
let reason_code_writen = self.reason_code.async_write(stream).await?;
52+
let properties_writen = self.properties.async_write(stream).await?;
53+
Ok(reason_code_writen + properties_writen)
5754
}
5855
}
5956

mqrstt/src/packets/connack/mod.rs

+33-41
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,19 @@ impl<S> PacketAsyncRead<S> for ConnAck
5353
where
5454
S: tokio::io::AsyncRead + Unpin,
5555
{
56-
fn async_read(_: u8, _: usize, stream: &mut S) -> impl std::future::Future<Output = Result<(Self, usize), super::error::ReadError>> {
57-
async move {
58-
let (connack_flags, read_bytes) = ConnAckFlags::async_read(stream).await?;
59-
let (reason_code, reason_code_read_bytes) = ConnAckReasonCode::async_read(stream).await?;
60-
let (connack_properties, connack_properties_read_bytes) = ConnAckProperties::async_read(stream).await?;
61-
62-
Ok((
63-
Self {
64-
connack_flags,
65-
reason_code,
66-
connack_properties,
67-
},
68-
read_bytes + reason_code_read_bytes + connack_properties_read_bytes,
69-
))
70-
}
56+
async fn async_read(_: u8, _: usize, stream: &mut S) -> Result<(Self, usize), super::error::ReadError> {
57+
let (connack_flags, read_bytes) = ConnAckFlags::async_read(stream).await?;
58+
let (reason_code, reason_code_read_bytes) = ConnAckReasonCode::async_read(stream).await?;
59+
let (connack_properties, connack_properties_read_bytes) = ConnAckProperties::async_read(stream).await?;
60+
61+
Ok((
62+
Self {
63+
connack_flags,
64+
reason_code,
65+
connack_properties,
66+
},
67+
read_bytes + reason_code_read_bytes + connack_properties_read_bytes,
68+
))
7169
}
7270
}
7371

@@ -85,15 +83,13 @@ impl<S> crate::packets::mqtt_trait::PacketAsyncWrite<S> for ConnAck
8583
where
8684
S: tokio::io::AsyncWrite + Unpin,
8785
{
88-
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
89-
async move {
90-
use crate::packets::mqtt_trait::MqttAsyncWrite;
91-
let connack_flags_writen = self.connack_flags.async_write(stream).await?;
92-
let reason_code_writen = self.reason_code.async_write(stream).await?;
93-
let connack_properties_writen = self.connack_properties.async_write(stream).await?;
94-
95-
Ok(connack_flags_writen + reason_code_writen + connack_properties_writen)
96-
}
86+
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
87+
use crate::packets::mqtt_trait::MqttAsyncWrite;
88+
let connack_flags_writen = self.connack_flags.async_write(stream).await?;
89+
let reason_code_writen = self.reason_code.async_write(stream).await?;
90+
let connack_properties_writen = self.connack_properties.async_write(stream).await?;
91+
92+
Ok(connack_flags_writen + reason_code_writen + connack_properties_writen)
9793
}
9894
}
9995

@@ -114,16 +110,14 @@ impl<S> MqttAsyncRead<S> for ConnAckFlags
114110
where
115111
S: tokio::io::AsyncRead + Unpin,
116112
{
117-
fn async_read(stream: &mut S) -> impl std::future::Future<Output = Result<(Self, usize), super::error::ReadError>> {
118-
async move {
119-
let byte = stream.read_u8().await?;
120-
Ok((
121-
Self {
122-
session_present: (byte & 0b00000001) == 0b00000001,
123-
},
124-
1,
125-
))
126-
}
113+
async fn async_read(stream: &mut S) -> Result<(Self, usize), super::error::ReadError> {
114+
let byte = stream.read_u8().await?;
115+
Ok((
116+
Self {
117+
session_present: (byte & 0b00000001) == 0b00000001,
118+
},
119+
1,
120+
))
127121
}
128122
}
129123

@@ -154,14 +148,12 @@ impl<S> crate::packets::mqtt_trait::MqttAsyncWrite<S> for ConnAckFlags
154148
where
155149
S: tokio::io::AsyncWrite + Unpin,
156150
{
157-
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
158-
async move {
159-
use tokio::io::AsyncWriteExt;
160-
let byte = self.session_present as u8;
151+
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
152+
use tokio::io::AsyncWriteExt;
153+
let byte = self.session_present as u8;
161154

162-
stream.write_u8(byte).await?;
163-
Ok(1)
164-
}
155+
stream.write_u8(byte).await?;
156+
Ok(1)
165157
}
166158
}
167159

mqrstt/src/packets/connect/connect_flags.rs

+8-12
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,9 @@ impl<S> MqttAsyncRead<S> for ConnectFlags
7777
where
7878
S: tokio::io::AsyncRead + Unpin,
7979
{
80-
fn async_read(stream: &mut S) -> impl std::future::Future<Output = Result<(Self, usize), crate::packets::error::ReadError>> {
81-
async move {
82-
let byte = stream.read_u8().await?;
83-
Ok((ConnectFlags::from_u8(byte)?, 1))
84-
}
80+
async fn async_read(stream: &mut S) -> Result<(Self, usize), crate::packets::error::ReadError> {
81+
let byte = stream.read_u8().await?;
82+
Ok((ConnectFlags::from_u8(byte)?, 1))
8583
}
8684
}
8785

@@ -96,13 +94,11 @@ impl<S> MqttAsyncWrite<S> for ConnectFlags
9694
where
9795
S: tokio::io::AsyncWrite + Unpin,
9896
{
99-
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
100-
async move {
101-
use tokio::io::AsyncWriteExt;
102-
let byte = self.into_u8()?;
103-
stream.write_u8(byte).await?;
97+
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
98+
use tokio::io::AsyncWriteExt;
99+
let byte = self.into_u8()?;
100+
stream.write_u8(byte).await?;
104101

105-
Ok(1)
106-
}
102+
Ok(1)
107103
}
108104
}

mqrstt/src/packets/connect/last_will.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,12 @@ impl<S> MqttAsyncWrite<S> for LastWill
8282
where
8383
S: tokio::io::AsyncWrite + Unpin,
8484
{
85-
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
86-
async move {
87-
let properties_written = self.last_will_properties.async_write(stream).await?;
88-
let topic_written = self.topic.async_write(stream).await?;
89-
let payload_written = self.payload.async_write(stream).await?;
85+
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
86+
let properties_written = self.last_will_properties.async_write(stream).await?;
87+
let topic_written = self.topic.async_write(stream).await?;
88+
let payload_written = self.payload.async_write(stream).await?;
9089

91-
Ok(properties_written + topic_written + payload_written)
92-
}
90+
Ok(properties_written + topic_written + payload_written)
9391
}
9492
}
9593

mqrstt/src/packets/macros/properties_macros.rs

+7-33
Original file line numberDiff line numberDiff line change
@@ -34,40 +34,14 @@ macro_rules! define_properties {
3434
}
3535

3636
impl<S> $crate::packets::mqtt_trait::MqttAsyncWrite<S> for $name where S: tokio::io::AsyncWrite + Unpin {
37-
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
38-
async move {
39-
let mut bytes_writen = 0;
40-
$crate::packets::VariableInteger::write_async_variable_integer(&self.wire_len(), stream).await?;
41-
$(
42-
$crate::packets::macros::properties_write!(self, bytes_writen, stream, PropertyType::$prop_variant);
43-
)*
44-
45-
Ok(bytes_writen)
46-
}
47-
48-
// let (len, length_variable_integer) = <usize as crate::packets::primitive::VariableInteger>::read_async_variable_integer(stream).await?;
49-
// if len == 0 {
50-
// return Ok((Self::default(), length_variable_integer));
51-
// }
52-
53-
// let mut properties = $name::default();
54-
55-
// let mut read_property_bytes = 0;
56-
// loop {
57-
// let (prop, read_bytes) = crate::packets::PropertyType::async_read(stream).await?;
58-
// read_property_bytes += read_bytes;
59-
// match prop {
60-
// $(
61-
// $crate::packets::macros::properties_read_match_branch_name!($prop_variant) => $crate::packets::macros::properties_read_match_branch_body!(stream, properties, read_property_bytes, PropertyType::$prop_variant),
62-
// )*
63-
// e => return Err($crate::packets::error::ReadError::DeserializeError(DeserializeError::UnexpectedProperty(e, PacketType::PubRel))),
64-
// }
65-
// if read_property_bytes == len {
66-
// break;
67-
// }
68-
// }
37+
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
38+
let mut bytes_writen = 0;
39+
$crate::packets::VariableInteger::write_async_variable_integer(&self.wire_len(), stream).await?;
40+
$(
41+
$crate::packets::macros::properties_write!(self, bytes_writen, stream, PropertyType::$prop_variant);
42+
)*
6943

70-
// Ok((properties, length_variable_integer + read_property_bytes))
44+
Ok(bytes_writen)
7145
}
7246
}
7347

mqrstt/src/packets/macros/reason_code_macros.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ macro_rules! reason_code {
1414
} -> ())
1515
}
1616

17-
pub(crate) fn to_u8(&self) -> u8 {
17+
pub(crate) fn to_u8(self) -> u8 {
1818
$crate::packets::macros::reason_code_match_write!(@ $name, self, {
1919
$($code,)*
2020
} -> ())

mqrstt/src/packets/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ impl Packet {
347347
#[cfg(feature = "logs")]
348348
tracing::trace!("Read packet header: {:?}", header);
349349

350-
Ok(Packet::async_read_packet(header, stream).await?)
350+
Packet::async_read_packet(header, stream).await
351351
}
352352

353353
pub fn read(buffer: &mut BytesMut) -> Result<Packet, error::ReadBytes<DeserializeError>> {

mqrstt/src/packets/mqtt_trait/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub trait MqttWrite: Sized {
4343
fn write(&self, buf: &mut BytesMut) -> Result<(), crate::packets::error::SerializeError>;
4444
}
4545

46-
impl<'a, T> MqttWrite for &'a T
46+
impl<T> MqttWrite for &T
4747
where
4848
T: MqttWrite,
4949
{

0 commit comments

Comments
 (0)