Skip to content

Commit 3439e95

Browse files
examples in readme and example dir
1 parent 5caa701 commit 3439e95

File tree

7 files changed

+179
-101
lines changed

7 files changed

+179
-101
lines changed

examples/tcp/Cargo.toml

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ license = "MIT"
66

77
[dependencies]
88
smol = { version = "2" }
9+
futures = "0.3.31"
910

1011
tokio = { version = "1", features = ["full"] }
1112

@@ -15,6 +16,14 @@ mqrstt = { path = "../../mqrstt", features = ["logs"] }
1516
name = "tokio"
1617
path = "src/tokio.rs"
1718

19+
[[bin]]
20+
name = "ping_pong"
21+
path = "src/ping_pong.rs"
22+
23+
[[bin]]
24+
name = "ping_pong_smol"
25+
path = "src/ping_pong_smol.rs"
26+
1827
[[bin]]
1928
name = "smol"
2029
path = "src/smol.rs"

examples/tcp/src/ping_pong.rs

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use mqrstt::{
2+
packets::{self, Packet},
3+
AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus,
4+
};
5+
use tokio::time::Duration;
6+
7+
pub struct PingPong {
8+
pub client: MqttClient,
9+
}
10+
impl AsyncEventHandler for PingPong {
11+
// Handlers only get INCOMING packets.
12+
async fn handle(&mut self, event: packets::Packet) {
13+
match event {
14+
Packet::Publish(p) => {
15+
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
16+
if payload.to_lowercase().contains("ping") {
17+
self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap();
18+
println!("Received Ping, Send pong!");
19+
}
20+
}
21+
}
22+
Packet::ConnAck(_) => {
23+
println!("Connected!")
24+
}
25+
_ => (),
26+
}
27+
}
28+
}
29+
30+
#[tokio::main]
31+
async fn main() {
32+
let (mut network, client) = NetworkBuilder::new_from_client_id("TokioTcpPingPongExample").tokio_network();
33+
34+
let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
35+
let stream = tokio::io::BufStream::new(stream);
36+
37+
let mut pingpong = PingPong { client: client.clone() };
38+
39+
network.connect(stream, &mut pingpong).await.unwrap();
40+
41+
client.subscribe("mqrstt").await.unwrap();
42+
43+
let network_handle = tokio::spawn(async move {
44+
let result = network.run(&mut pingpong).await;
45+
(result, pingpong)
46+
});
47+
48+
tokio::time::sleep(Duration::from_secs(30)).await;
49+
client.disconnect().await.unwrap();
50+
51+
let (result, _pingpong) = network_handle.await.unwrap();
52+
assert!(result.is_ok());
53+
assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
54+
}

examples/tcp/src/ping_pong_smol.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use mqrstt::{
2+
packets::{self, Packet},
3+
AsyncEventHandler, ConnectOptions, MqttClient, NetworkBuilder, NetworkStatus,
4+
};
5+
pub struct PingPong {
6+
pub client: MqttClient,
7+
}
8+
impl AsyncEventHandler for PingPong {
9+
// Handlers only get INCOMING packets. This can change later.
10+
async fn handle(&mut self, event: packets::Packet) {
11+
match event {
12+
Packet::Publish(p) => {
13+
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
14+
if payload.to_lowercase().contains("ping") {
15+
self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap();
16+
println!("Received Ping, Send pong!");
17+
}
18+
}
19+
}
20+
Packet::ConnAck(_) => {
21+
println!("Connected!")
22+
}
23+
_ => (),
24+
}
25+
}
26+
}
27+
fn main() {
28+
smol::block_on(async {
29+
let (mut network, client) = NetworkBuilder::new_from_client_id("mqrsttSmolExample").smol_network();
30+
let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
31+
32+
let mut pingpong = PingPong { client: client.clone() };
33+
34+
network.connect(stream, &mut pingpong).await.unwrap();
35+
36+
// This subscribe is only processed when we run the network
37+
client.subscribe("mqrstt").await.unwrap();
38+
39+
let task_handle = smol::spawn(async move {
40+
let result = network.run(&mut pingpong).await;
41+
(result, pingpong)
42+
});
43+
44+
smol::Timer::after(std::time::Duration::from_secs(30)).await;
45+
client.disconnect().await.unwrap();
46+
47+
let (result, _pingpong) = task_handle.await;
48+
49+
assert!(result.is_ok());
50+
assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
51+
});
52+
}

examples/tcp/src/tokio.rs

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ async fn main() {
2121
let mut handler = Handler { byte_count: 0 };
2222

2323
let stream = tokio::net::TcpStream::connect(hostname).await.unwrap();
24+
let stream = tokio::io::BufStream::new(stream);
2425
let (mut network, client) = mqrstt::NetworkBuilder::new_from_client_id("TestClientABCDEFG").tokio_network();
2526

2627
network.connect(stream, &mut handler).await.unwrap();

README.md mqrstt/README.md

+60-100
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ For a sync approach the stream has to implement the [`std::io::Read`] and [`std:
2424
- Keep alive depends on actual communication
2525

2626
### To do
27-
- no_std (Requires a lot of work to use no heap allocations and depend on stack)
2827
- Even More testing
29-
- More documentation
28+
- Add TLS examples to repository
3029

3130
## MSRV
3231
From 0.3 the tokio and smol variants will require MSRV: 1.75 due to async fn in trait feature.
@@ -38,159 +37,121 @@ From 0.3 the tokio and smol variants will require MSRV: 1.75 due to async fn in
3837
- Create a new connection when an error or disconnect is encountered
3938
- Handlers only get incoming packets
4039

41-
### TLS:
42-
TLS examples are too larger for a README. [TLS examples](https://github.com/GunnarMorrigan/mqrstt/tree/main/examples).
4340

4441
### Smol example:
4542
```rust
4643
use mqrstt::{
47-
MqttClient,
48-
ConnectOptions,
49-
new_smol,
5044
packets::{self, Packet},
51-
AsyncEventHandler,
52-
smol::NetworkStatus,
45+
AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus,
5346
};
54-
use bytes::Bytes;
5547
pub struct PingPong {
5648
pub client: MqttClient,
5749
}
5850
impl AsyncEventHandler for PingPong {
5951
// Handlers only get INCOMING packets. This can change later.
60-
async fn handle(&mut self, event: packets::Packet {
52+
async fn handle(&mut self, event: packets::Packet) {
6153
match event {
6254
Packet::Publish(p) => {
6355
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
6456
if payload.to_lowercase().contains("ping") {
65-
self.client
66-
.publish(
67-
p.topic.clone(),
68-
p.qos,
69-
p.retain,
70-
Bytes::from_static(b"pong"),
71-
)
72-
.await
73-
.unwrap();
57+
self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap();
7458
println!("Received Ping, Send pong!");
7559
}
7660
}
77-
},
78-
Packet::ConnAck(_) => { println!("Connected!") },
61+
}
62+
Packet::ConnAck(_) => {
63+
println!("Connected!")
64+
}
7965
_ => (),
8066
}
8167
}
8268
}
83-
smol::block_on(async {
84-
let options = ConnectOptions::new("mqrsttSmolExample");
85-
let (mut network, client) = new_smol(options);
86-
let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883))
87-
.await
88-
.unwrap();
89-
90-
let mut pingpong = PingPong {
91-
client: client.clone(),
92-
};
69+
fn main() {
70+
smol::block_on(async {
71+
let (mut network, client) = NetworkBuilder::new_from_client_id("mqrsttSmolExample").smol_network();
72+
let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
9373

94-
network.connect(stream, &mut pingpong).await.unwrap();
74+
let mut pingpong = PingPong { client: client.clone() };
9575

96-
// This subscribe is only processed when we run the network
97-
client.subscribe("mqrstt").await.unwrap();
76+
network.connect(stream, &mut pingpong).await.unwrap();
77+
78+
// This subscribe is only processed when we run the network
79+
client.subscribe("mqrstt").await.unwrap();
80+
81+
let task_handle = smol::spawn(async move {
82+
let result = network.run(&mut pingpong).await;
83+
(result, pingpong)
84+
});
85+
86+
smol::Timer::after(std::time::Duration::from_secs(30)).await;
87+
client.disconnect().await.unwrap();
88+
89+
let (result, _pingpong) = task_handle.await;
90+
91+
assert!(result.is_ok());
92+
assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
93+
});
94+
}
9895

99-
let (n, t) = futures::join!(
100-
async {
101-
loop {
102-
return match network.poll(&mut pingpong).await {
103-
Ok(NetworkStatus::Active) => continue,
104-
otherwise => otherwise,
105-
};
106-
}
107-
},
108-
async {
109-
smol::Timer::after(std::time::Duration::from_secs(30)).await;
110-
client.disconnect().await.unwrap();
111-
}
112-
);
113-
assert!(n.is_ok());
114-
});
11596
```
11697

11798
### Tokio example:
11899
```rust
119100
use mqrstt::{
120-
MqttClient,
121-
ConnectOptions,
122-
new_tokio,
123101
packets::{self, Packet},
124-
AsyncEventHandler,
125-
tokio::NetworkStatus,
102+
AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus,
126103
};
127104
use tokio::time::Duration;
128-
use bytes::Bytes;
129105

130106
pub struct PingPong {
131107
pub client: MqttClient,
132108
}
133109
impl AsyncEventHandler for PingPong {
134-
// Handlers only get INCOMING packets. This can change later.
110+
// Handlers only get INCOMING packets.
135111
async fn handle(&mut self, event: packets::Packet) {
136112
match event {
137113
Packet::Publish(p) => {
138114
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
139115
if payload.to_lowercase().contains("ping") {
140-
self.client
141-
.publish(
142-
p.topic.clone(),
143-
p.qos,
144-
p.retain,
145-
Bytes::from_static(b"pong"),
146-
)
147-
.await
148-
.unwrap();
116+
self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap();
149117
println!("Received Ping, Send pong!");
150118
}
151119
}
152-
},
153-
Packet::ConnAck(_) => { println!("Connected!") },
120+
}
121+
Packet::ConnAck(_) => {
122+
println!("Connected!")
123+
}
154124
_ => (),
155125
}
156126
}
157127
}
158128

159129
#[tokio::main]
160130
async fn main() {
161-
let options = ConnectOptions::new("TokioTcpPingPongExample");
162-
163-
let (mut network, client) = new_tokio(options);
164-
165-
let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
166-
.await
167-
.unwrap();
168-
169-
let mut pingpong = PingPong {
170-
client: client.clone(),
171-
};
172-
131+
let (mut network, client) = NetworkBuilder::new_from_client_id("TokioTcpPingPongExample").tokio_network();
132+
133+
let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
134+
let stream = tokio::io::BufStream::new(stream);
135+
136+
let mut pingpong = PingPong { client: client.clone() };
137+
173138
network.connect(stream, &mut pingpong).await.unwrap();
174-
139+
175140
client.subscribe("mqrstt").await.unwrap();
176-
177-
178-
let (n, _) = tokio::join!(
179-
async {
180-
loop {
181-
return match network.poll(&mut pingpong).await {
182-
Ok(NetworkStatus::Active) => continue,
183-
otherwise => otherwise,
184-
};
185-
}
186-
},
187-
async {
188-
tokio::time::sleep(Duration::from_secs(30)).await;
189-
client.disconnect().await.unwrap();
190-
}
191-
);
192-
assert!(n.is_ok());
141+
142+
let network_handle = tokio::spawn(async move {
143+
let result = network.run(&mut pingpong).await;
144+
(result, pingpong)
145+
});
146+
147+
tokio::time::sleep(Duration::from_secs(30)).await;
148+
client.disconnect().await.unwrap();
149+
150+
let (result, _pingpong) = network_handle.await.unwrap();
151+
assert!(result.is_ok());
152+
assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
193153
}
154+
194155
```
195156

196157
### Sync example:
@@ -284,7 +245,6 @@ Licensed under
284245
* Mozilla Public License, Version 2.0, [(MPL-2.0)](https://choosealicense.com/licenses/mpl-2.0/)
285246

286247
## Contribution
287-
288248
Unless you explicitly state otherwise, any contribution intentionally
289249
submitted for inclusion in the work by you, shall be licensed under MPL-2.0, without any additional terms or
290250
conditions.

mqrstt/src/packets/error.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub enum ReadError {
2020
IoError(#[from] std::io::Error),
2121
}
2222

23-
#[derive(Error, Clone, Debug)]
23+
#[derive(Error, Clone, Debug, PartialEq, Eq)]
2424
pub enum DeserializeError {
2525
#[error("Malformed packet: {0}")]
2626
MalformedPacketWithInfo(String),

mqrstt/src/tokio/network.rs

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ where
5555
S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Sized + Unpin + Send + 'static,
5656
{
5757
/// Initializes an MQTT connection with the provided configuration an stream
58+
///
59+
/// It is recommended to use a buffered stream. [`tokio::io::BufStream`] could be used to easily buffer both read and write.
5860
pub async fn connect(&mut self, mut stream: S, handler: &mut H) -> Result<(), ConnectionError> {
5961
let conn_ack = stream.connect(&self.options).await?;
6062
self.last_network_action = Instant::now();

0 commit comments

Comments
 (0)