Skip to content

Commit

Permalink
fix: upgrade protocol isolated
Browse files Browse the repository at this point in the history
  • Loading branch information
zsluedem committed Nov 4, 2023
1 parent 0ec1d26 commit 979ba42
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 63 deletions.
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ discv5 = { workspace = true }
ethers = { workspace = true }
eyre = "0.6.8"
futures = "0.3.28"
futures-bounded = "0.2.0"
lazy_static = { workspace = true }
libp2p-mplex = { version = "0.40.0" }
silius-primitives = { path = "../primitives" }
Expand Down
21 changes: 15 additions & 6 deletions crates/p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
enr::{keypair_to_combine, EnrExt},
gossipsub::topic,
peer_manager::PeerManagerEvent,
request_response::{self, Ping, Request, RequestId, Response},
request_response::{self, Ping, Pong, Request, RequestId, Response},
};

struct TokioExecutor;
Expand Down Expand Up @@ -206,11 +206,20 @@ impl Network {
request,
response_sender,
..
} => Some(NetworkEvent::RequestMessage {
peer_id,
request,
response_sender,
}),
} => match request {
Request::Ping(ping) => {
let response = Response::Pong(Default::default());
response_sender
.send(response)
.expect("channel should exist");
None
}
_ => Some(NetworkEvent::RequestMessage {
peer_id,
request,
response_sender,
}),
},
request_response::Event::Response {
peer_id, response, ..
} => Some(NetworkEvent::ResponseMessage { peer_id, response }),
Expand Down
57 changes: 45 additions & 12 deletions crates/p2p/src/request_response/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use libp2p::{
};

use super::{
handler::{Handler, HandlerEvent},
handler::{Handler, HandlerEvent, OutboundInfo},
models::{Request, RequestId, Response},
outbound::OutboundContainer,
BoundError,
};

#[derive(Debug)]
Expand Down Expand Up @@ -48,6 +48,10 @@ pub enum Event {
peer_id: PeerId,
request_id: RequestId,
},
UpgradeFailure {
peer_id: PeerId,
request_id: RequestId,
},
}

#[derive(Debug)]
Expand All @@ -66,9 +70,11 @@ pub enum OutboundFailure {
ConnectionClosed,
/// The remote supports none of the requested protocols.
UnsupportedProtocols,
/// Error happended while handleing the outbound
BoundError(BoundError),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum InboundFailure {
/// The inbound request timed out, either while reading the
/// incoming request or before a response is sent, e.g. if
Expand All @@ -84,6 +90,8 @@ pub enum InboundFailure {
/// due to the [`ResponseChannel`] being dropped instead of
/// being passed to [`Behaviour::send_response`].
ResponseOmission,
/// Error happended while handleing the inbound
BoundError(BoundError),
}
#[derive(Debug)]
pub struct Config {
Expand All @@ -103,10 +111,11 @@ pub struct Behaviour {
next_request_id: RequestId,
/// The next (inbound) request ID.
next_inbound_id: Arc<AtomicU64>,
/// The next (outbound) request ID
/// pending events to return from `Poll`
pending_events: VecDeque<ToSwarm<Event, OutboundContainer>>,
pending_events: VecDeque<ToSwarm<Event, OutboundInfo>>,
connected: HashMap<PeerId, Vec<Connection>>,
pending_outbound_requests: HashMap<PeerId, Vec<OutboundContainer>>,
pending_outbound_requests: HashMap<PeerId, Vec<OutboundInfo>>,
}

impl Behaviour {
Expand All @@ -129,7 +138,7 @@ impl Behaviour {

pub fn send_request(&mut self, peer: &PeerId, request: Request) -> RequestId {
let request_id = self.next_request_id();
let request = OutboundContainer {
let request = OutboundInfo {
request,
request_id,
};
Expand All @@ -155,11 +164,7 @@ impl Behaviour {
sender.send(response)
}

fn try_send_request(
&mut self,
peer: &PeerId,
request: OutboundContainer,
) -> Option<OutboundContainer> {
fn try_send_request(&mut self, peer: &PeerId, request: OutboundInfo) -> Option<OutboundInfo> {
if let Some(connections) = self.connected.get_mut(peer) {
if connections.is_empty() {
return Some(request);
Expand Down Expand Up @@ -387,6 +392,34 @@ impl NetworkBehaviour for Behaviour {
response,
}));
}
HandlerEvent::InboundTimeout(request_id) => {
self.remove_pending_inbound_response(&peer_id, connection_id, &request_id);
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
peer_id,
request_id,
error: InboundFailure::Timeout,
}))
}
HandlerEvent::InboundError { request_id, error } => {
self.remove_pending_inbound_response(&peer_id, connection_id, &request_id);
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
peer_id,
request_id,
error: InboundFailure::BoundError(error),
}))
}
HandlerEvent::OutboundError { request_id, error } => {
self.remove_pending_outbound_response(&peer_id, connection_id, request_id);
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer_id,
request_id,
error: OutboundFailure::BoundError(error),
}))
}
HandlerEvent::DialUpgradeTimeout(_) => {}
HandlerEvent::ResponseSent(request_id) => {
self.remove_pending_outbound_response(&peer_id, connection_id, request_id);

Expand All @@ -407,7 +440,7 @@ impl NetworkBehaviour for Behaviour {
}));
}
HandlerEvent::OutboundTimeout(request_id) => {
self.remove_pending_inbound_response(&peer_id, connection_id, &request_id);
self.remove_pending_outbound_response(&peer_id, connection_id, request_id);

self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
Expand Down
Loading

0 comments on commit 979ba42

Please sign in to comment.