Skip to content

Commit 2fb6cca

Browse files
committed
[WIP] Implement sending messages in backend
1 parent 94c65d9 commit 2fb6cca

File tree

13 files changed

+1142
-172
lines changed

13 files changed

+1142
-172
lines changed

Cargo.lock

+925-147
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@ identicon-rs = "4.0.1"
4040
adw = { package = "libadwaita", version = "0.3.0", features = ["gtk_v4_6", "v1_3"] }
4141
gtk = { package = "gtk4",version = "0.6.6", features = ["v4_10"] }
4242
mail-parser = "0.8.2"
43+
strum = { version = "0.24", features = ["derive"] }
44+
emailmessage = "0.2.2"

devzone/docker-compose.yaml

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
version: "3"
2+
3+
services:
4+
devzone:
5+
image: "ubuntu:lunar"
6+
stdin_open: true
7+
tty: true
8+
volumes:
9+
- ../:/app/bitmessage-rs:rw
10+
network_mode: host
11+
environment:
12+
- DISPLAY
13+
entrypoint: /bin/bash

flake.nix

+2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
pango
5252
gtk4
5353
libadwaita
54+
adwaita-icon-theme
5455
openssl
5556
sqlite
5657
(if system == "aarch64-darwin" then
@@ -73,6 +74,7 @@
7374
gdk-pixbuf
7475
pango
7576
gtk4
77+
gnome.adwaita-icon-theme
7678
libadwaita
7779
openssl
7880
sqlite

src/network/messages.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl Object {
7979
object
8080
}
8181

82-
pub fn send(mut self, mut worker_sink: mpsc::Sender<WorkerCommand>) {
82+
pub fn do_proof_of_work(mut self, mut worker_sink: mpsc::Sender<WorkerCommand>) {
8383
let target = pow::get_pow_target(
8484
&self,
8585
pow::NETWORK_MIN_NONCE_TRIALS_PER_BYTE,

src/network/node/client.rs

+37-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
1+
use emailmessage::{header, Message, SinglePart};
12
use std::error::Error;
23

4+
use chrono::Utc;
35
use futures::{
46
channel::{mpsc, oneshot},
57
SinkExt,
68
};
79
use libp2p::{Multiaddr, PeerId};
810

9-
use crate::{network::address::Address, repositories::sqlite::models};
11+
use crate::{
12+
network::address::Address,
13+
repositories::sqlite::models::{self, MessageStatus},
14+
};
1015

1116
use super::worker::{Folder, WorkerCommand};
1217

@@ -117,4 +122,35 @@ impl NodeClient {
117122
.expect("Sender not to be dropped")
118123
.expect("repo not to fail")
119124
}
125+
126+
pub async fn send_message(&mut self, from: String, to: String, title: String, body: String) {
127+
let (sender, receiver) = oneshot::channel();
128+
let m: Message<SinglePart<&str>> = Message::builder().subject(title).mime_body(
129+
SinglePart::builder()
130+
.header(header::ContentType(
131+
"text/plain; charset=utf8".parse().unwrap(),
132+
))
133+
.header(header::ContentTransferEncoding::QuotedPrintable)
134+
.body(&body),
135+
);
136+
let data = m.to_string().into_bytes();
137+
let msg = models::Message {
138+
hash: "".to_string(),
139+
sender: from.clone(),
140+
recipient: to,
141+
created_at: Utc::now().naive_utc(),
142+
status: MessageStatus::Unknown.to_string(),
143+
signature: Vec::new(),
144+
data,
145+
};
146+
147+
self.sender
148+
.send(WorkerCommand::SendMessage { msg, from, sender })
149+
.await
150+
.expect("Receiver not to be dropped");
151+
receiver
152+
.await
153+
.expect("Sender not to be dropped")
154+
.expect("repo not to fail")
155+
}
120156
}

src/network/node/handler.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
use async_std::task;
2-
use sha2::Digest;
31
use std::{error::Error, sync::Arc};
42

53
use chrono::Utc;
64
use futures::{
75
channel::{mpsc, oneshot},
86
lock::Mutex,
9-
FutureExt, SinkExt,
7+
SinkExt,
108
};
119

1210
use crate::{
@@ -31,6 +29,7 @@ pub struct Handler {
3129
message_repo: Box<MessageRepositorySync>,
3230
requested_objects: Vec<String>, // TODO periodically request missing object from every connection we have
3331
worker_event_sender: mpsc::Sender<WorkerCommand>,
32+
pubkey_notifier_sink: mpsc::Sender<String>,
3433
}
3534

3635
impl Handler {
@@ -39,13 +38,15 @@ impl Handler {
3938
inventory_repo: Box<InventoryRepositorySync>,
4039
message_repo: Box<MessageRepositorySync>,
4140
worker_event_sender: mpsc::Sender<WorkerCommand>,
41+
pubkey_notifier_sink: mpsc::Sender<String>,
4242
) -> Handler {
4343
Handler {
4444
address_repo,
4545
inventory_repo: Arc::new(Mutex::new(inventory_repo)),
4646
message_repo,
4747
requested_objects: Vec::new(),
4848
worker_event_sender,
49+
pubkey_notifier_sink,
4950
}
5051
}
5152

@@ -201,6 +202,8 @@ impl Handler {
201202
.await
202203
.expect("repo not to fail");
203204

205+
self.pubkey_notifier_sink.send(tag_str).await.unwrap();
206+
204207
Ok(())
205208
}
206209

@@ -241,7 +244,7 @@ impl Handler {
241244
},
242245
expires,
243246
);
244-
obj.send(self.worker_event_sender.clone());
247+
obj.do_proof_of_work(self.worker_event_sender.clone());
245248
}
246249
}
247250

src/network/node/worker.rs

+119-10
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
1-
use async_std::task;
21
use chrono::Utc;
32
use diesel::{
43
connection::SimpleConnection,
54
r2d2::{ConnectionManager, Pool},
65
SqliteConnection,
76
};
87
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
9-
use sha2::Digest;
10-
use std::{borrow::Cow, error::Error, fs, io, iter, time::Duration};
8+
use rand::distributions::{Alphanumeric, DistString};
9+
use std::{borrow::Cow, collections::HashMap, error::Error, fs, io, iter, time::Duration};
1110

1211
use directories::ProjectDirs;
1312
use futures::{
1413
channel::{mpsc, oneshot},
15-
select, FutureExt, SinkExt, StreamExt,
14+
select, StreamExt,
1615
};
1716
use libp2p::{
1817
core::upgrade::Version,
@@ -34,16 +33,17 @@ use crate::{
3433
BitmessageBehaviourEvent, BitmessageNetBehaviour, BitmessageProtocol,
3534
BitmessageProtocolCodec, BitmessageResponse,
3635
},
37-
messages::{self, MessageCommand, MessagePayload, ObjectKind, UnencryptedMsg},
36+
messages::{self, MessageCommand, MessagePayload, MsgEncoding, ObjectKind, UnencryptedMsg},
3837
},
39-
pow,
4038
repositories::{
4139
address::AddressRepositorySync,
4240
inventory::InventoryRepositorySync,
4341
message::MessageRepositorySync,
4442
sqlite::{
45-
address::SqliteAddressRepository, inventory::SqliteInventoryRepository,
46-
message::SqliteMessageRepository, models,
43+
address::SqliteAddressRepository,
44+
inventory::SqliteInventoryRepository,
45+
message::SqliteMessageRepository,
46+
models::{self, MessageStatus},
4747
},
4848
},
4949
};
@@ -135,6 +135,7 @@ pub enum WorkerCommand {
135135
},
136136
SendMessage {
137137
msg: models::Message,
138+
from: String,
138139
sender: oneshot::Sender<Result<(), DynError>>,
139140
},
140141
}
@@ -143,7 +144,12 @@ pub struct NodeWorker {
143144
local_peer_id: PeerId,
144145
swarm: Swarm<BitmessageNetBehaviour>,
145146
handler: Handler,
147+
command_sender: mpsc::Sender<WorkerCommand>,
146148
command_receiver: mpsc::Receiver<WorkerCommand>,
149+
150+
pubkey_notifier: mpsc::Receiver<String>,
151+
tracked_pubkeys: HashMap<String, bool>, // TODO populate it on startup
152+
147153
pending_commands: Vec<WorkerCommand>,
148154
_sqlite_connection_pool: Pool<ConnectionManager<SqliteConnection>>,
149155
common_topic: Sha256Topic,
@@ -251,6 +257,7 @@ impl NodeWorker {
251257
.expect("subscription not to fail");
252258

253259
let (sender, receiver) = mpsc::channel(0);
260+
let (pubkey_notifier_sink, pubkey_notifier) = mpsc::channel(0);
254261
let inventory_repo = Box::new(SqliteInventoryRepository::new(pool.clone()));
255262
let address_repo = Box::new(SqliteAddressRepository::new(pool.clone()));
256263
let message_repo = Box::new(SqliteMessageRepository::new(pool.clone()));
@@ -263,7 +270,11 @@ impl NodeWorker {
263270
inventory_repo.clone(),
264271
message_repo.clone(),
265272
sender.clone(),
273+
pubkey_notifier_sink,
266274
),
275+
command_sender: sender.clone(),
276+
pubkey_notifier,
277+
tracked_pubkeys: HashMap::new(),
267278
command_receiver: receiver,
268279
pending_commands: Vec::new(),
269280
_sqlite_connection_pool: pool,
@@ -384,6 +395,18 @@ impl NodeWorker {
384395
.expect("receiver not to be dropped"),
385396
},
386397
WorkerCommand::NonceCalculated { obj } => {
398+
match &obj.kind {
399+
ObjectKind::Msg { encrypted: _ } => self
400+
.messages_repo
401+
.update_message_status(
402+
bs58::encode(&obj.hash).into_string(),
403+
MessageStatus::Sent,
404+
)
405+
.await
406+
.unwrap(),
407+
_ => {}
408+
}
409+
387410
self.inventory_repo
388411
.store_object(bs58::encode(&obj.hash).into_string(), obj)
389412
.await
@@ -468,7 +491,71 @@ impl NodeWorker {
468491
.expect("receiver not to be dropped"),
469492
},
470493
},
471-
WorkerCommand::SendMessage { msg, sender } => {}
494+
WorkerCommand::SendMessage {
495+
mut msg,
496+
from,
497+
sender,
498+
} => {
499+
let identity = self
500+
.address_repo
501+
.get_by_ripe_or_tag(from)
502+
.await
503+
.unwrap()
504+
.unwrap();
505+
let recipient: Option<Address> = self
506+
.address_repo
507+
.get_by_ripe_or_tag(msg.recipient.clone())
508+
.await
509+
.unwrap();
510+
match recipient {
511+
Some(v) => {
512+
msg.status = MessageStatus::WaitingForPOW.to_string();
513+
let unenc_msg = UnencryptedMsg {
514+
behavior_bitfield: 0,
515+
sender_ripe: msg.sender.clone(),
516+
destination_ripe: msg.recipient.clone(),
517+
encoding: MsgEncoding::Simple,
518+
message: msg.data.clone(),
519+
public_encryption_key: v
520+
.public_encryption_key
521+
.unwrap()
522+
.serialize()
523+
.to_vec(),
524+
public_signing_key: v.public_signing_key.unwrap().serialize().to_vec(),
525+
};
526+
let encrypted = Self::serialize_and_encrypt_payload_pub(
527+
unenc_msg,
528+
&v.public_encryption_key.unwrap(),
529+
);
530+
let object = messages::Object::with_signing(
531+
&identity,
532+
ObjectKind::Msg { encrypted },
533+
Utc::now() + chrono::Duration::days(7),
534+
);
535+
536+
msg.hash = bs58::encode(&object.hash).into_string();
537+
self.messages_repo.save_model(msg.clone()).await.unwrap();
538+
object.do_proof_of_work(self.command_sender.clone());
539+
}
540+
None => {
541+
msg.status = MessageStatus::WaitingForPubkey.to_string();
542+
// we generate random hash value, cuz we don't really know real hash value of the message at the moment, and it's not that important
543+
msg.hash = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
544+
self.messages_repo.save_model(msg.clone()).await.unwrap();
545+
// send getpubkey request
546+
let obj = messages::Object::with_signing(
547+
&identity,
548+
ObjectKind::Getpubkey {
549+
tag: Address::new(bs58::decode(msg.recipient).into_vec().unwrap())
550+
.tag,
551+
},
552+
Utc::now() + chrono::Duration::days(7),
553+
);
554+
obj.do_proof_of_work(self.command_sender.clone());
555+
}
556+
}
557+
sender.send(Ok(())).unwrap();
558+
}
472559
};
473560
}
474561

@@ -492,11 +579,18 @@ impl NodeWorker {
492579
log::debug!("Shutting down network event loop...");
493580
return;
494581
},
495-
}
582+
},
583+
pubkey_notification = self.pubkey_notifier.next() => self.handle_pubkey_notification(pubkey_notification.unwrap()).await,
496584
}
497585
}
498586
}
499587

588+
async fn handle_pubkey_notification(&mut self, tag: String) {
589+
if let Some(_) = self.tracked_pubkeys.get(&tag) {
590+
// TODO seek for messages which haven't been sent yet because of empty public key of recipient
591+
}
592+
}
593+
500594
/// When we receive IdentityInfo, if the peer supports our Kademlia protocol, we add
501595
/// their listen addresses to the DHT, so they will be propagated to other peers.
502596
fn handle_identify_event(&mut self, identify_event: identify::Event) {
@@ -541,6 +635,21 @@ impl NodeWorker {
541635
.unwrap();
542636
encrypted
543637
}
638+
639+
pub fn serialize_and_encrypt_payload_pub<T>(
640+
object: T,
641+
public_key: &libsecp256k1::PublicKey,
642+
) -> Vec<u8>
643+
where
644+
T: Serialize,
645+
{
646+
let encrypted = ecies::encrypt(
647+
&public_key.serialize(),
648+
serde_cbor::to_vec(&object).unwrap().as_ref(),
649+
)
650+
.unwrap();
651+
encrypted
652+
}
544653
}
545654

546655
fn extract_peer_id_from_multiaddr(

src/repositories/message.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use async_trait::async_trait;
44

55
use crate::network::messages::UnencryptedMsg;
66

7-
use super::sqlite::models;
7+
use super::sqlite::models::{self, MessageStatus};
88

99
#[async_trait]
1010
pub trait MessageRepository {
@@ -30,6 +30,12 @@ pub trait MessageRepository {
3030
&self,
3131
address: String,
3232
) -> Result<Vec<models::Message>, Box<dyn Error>>;
33+
34+
async fn update_message_status(
35+
&mut self,
36+
hash: String,
37+
status: MessageStatus,
38+
) -> Result<(), Box<dyn Error>>;
3339
}
3440

3541
pub type MessageRepositorySync = dyn MessageRepository + Sync + Send;

0 commit comments

Comments
 (0)