Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(client,server): use quinn_udp for I/O #1604

Merged
merged 50 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
2c42747
Initial commit
larseggert Jan 23, 2024
d4a626f
refactor(server): replace mio with tokio
mxinden Jan 24, 2024
a014dc6
Move ready logic into fn
mxinden Jan 30, 2024
4e27a78
Extend expect docs
mxinden Jan 30, 2024
9b2b15e
Restrict tokio features
mxinden Jan 30, 2024
d205da4
Only process datagram once
mxinden Jan 30, 2024
256f04c
Remove superfluous pub
mxinden Jan 30, 2024
f29ea6d
fmt
mxinden Jan 30, 2024
f421e91
Merge branch 'feat-ecn-io' of https://github.com/larseggert/neqo into…
mxinden Jan 30, 2024
a299070
Fix send path
mxinden Jan 30, 2024
50afb27
Fix receive path
mxinden Jan 30, 2024
69c0c91
Instantiate socket state once
mxinden Jan 30, 2024
7010f09
Fix busy loop
mxinden Jan 30, 2024
879afda
Merge branch 'main' of https://github.com/mozilla/neqo into quinn-udp
mxinden Feb 6, 2024
ea4a055
Have neqo-client use quinn-udp
mxinden Feb 6, 2024
c519783
Add TODO
mxinden Feb 7, 2024
1f820c7
Await writable
mxinden Feb 9, 2024
c1043cf
Unify tx and rx
mxinden Feb 9, 2024
8ae9dbf
Introduce wrapper type Socket
mxinden Feb 9, 2024
d49b158
Move bind to common
mxinden Feb 9, 2024
3d61bc2
Check if datagram was sent as a whole and avoid allocation
mxinden Feb 9, 2024
4f2e56a
Make into_data pub(crate)
mxinden Feb 10, 2024
84a4b63
Refactor send
mxinden Feb 10, 2024
72c84d6
Reference issue
mxinden Feb 10, 2024
a59c533
Merge branch 'main' of https://github.com/mozilla/neqo into quinn-udp
mxinden Feb 10, 2024
5cbdb96
Remove debugs
mxinden Feb 10, 2024
a9eef1a
Fix test
mxinden Feb 10, 2024
8288143
Reduce diff
mxinden Feb 10, 2024
ab7912a
Reduce diff
mxinden Feb 10, 2024
b3fb48f
Pin quinn-udp to rev
mxinden Feb 10, 2024
bff0055
Address clippy lints
mxinden Feb 10, 2024
d365689
fmt
mxinden Feb 10, 2024
541d9cb
fmt
mxinden Feb 10, 2024
070a46b
clippy
mxinden Feb 10, 2024
613a0df
clippy
mxinden Feb 10, 2024
5146152
Pass None ttl
mxinden Feb 10, 2024
dff1482
Merge branch 'main' into quinn-udp
larseggert Feb 12, 2024
00f0eeb
Merge branch 'main' into quinn-udp
larseggert Feb 12, 2024
6f330d3
Debug race condition on Windows
mxinden Feb 13, 2024
e9ac36c
Debug windows failure
mxinden Feb 13, 2024
addea3c
Merge branch 'main' into quinn-udp
larseggert Feb 14, 2024
01d8af6
Merge branch 'main' into quinn-udp
larseggert Feb 15, 2024
8ced9d9
Have receiver use random port
mxinden Feb 18, 2024
e9ba397
Test with Ect1 instead of Ce
mxinden Feb 20, 2024
8a65b5f
Revert "Debug windows failure"
mxinden Feb 20, 2024
4bbe07a
Revert "Debug race condition on Windows"
mxinden Feb 20, 2024
4d26bf0
Fold tos_tx
mxinden Feb 20, 2024
6238994
Add reason to clippy lint ignore
mxinden Feb 20, 2024
a0d3093
fix: include quinn-udp IPv4-mapped IPv6 patch
mxinden Feb 23, 2024
babec25
Merge branch 'main' into quinn-udp
larseggert Feb 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion neqo-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ clap = { version = "4.4", features = ["derive"] }
futures = "0.3"
hex = "0.4"
log = { version = "0.4", default-features = false }
neqo-common = { path = "./../neqo-common" }
neqo-common = { path = "./../neqo-common", features = ["udp"] }
neqo-crypto = { path = "./../neqo-crypto" }
neqo-http3 = { path = "./../neqo-http3" }
neqo-qpack = { path = "./../neqo-qpack" }
Expand Down
95 changes: 17 additions & 78 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use std::{
};

use clap::Parser;
use common::IpTos;
use futures::{
future::{select, Either},
FutureExt, TryFutureExt,
};
use neqo_common::{
self as common, event::Provider, hex, qdebug, qinfo, qlog::NeqoQlog, Datagram, Role,
self as common, event::Provider, hex, qdebug, qinfo, qlog::NeqoQlog, udp, Datagram, Role,
};
use neqo_crypto::{
constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256},
Expand All @@ -42,7 +41,7 @@ use neqo_transport::{
EmptyConnectionIdGenerator, Error as TransportError, StreamId, StreamType, Version,
};
use qlog::{events::EventImportance, streamer::QlogStreamer};
use tokio::{net::UdpSocket, time::Sleep};
use tokio::time::Sleep;
use url::{Origin, Url};

#[derive(Debug)]
Expand Down Expand Up @@ -351,21 +350,6 @@ impl QuicParameters {
}
}

async fn emit_datagram(socket: &UdpSocket, out_dgram: Datagram) -> Result<(), io::Error> {
let sent = match socket.send_to(&out_dgram, &out_dgram.destination()).await {
Ok(res) => res,
Err(ref err) if err.kind() != io::ErrorKind::WouldBlock => {
eprintln!("UDP send error: {err:?}");
0
}
Err(e) => return Err(e),
};
if sent != out_dgram.len() {
eprintln!("Unable to send all {} bytes of datagram", out_dgram.len());
}
Ok(())
}

fn get_output_file(
url: &Url,
output_dir: &Option<PathBuf>,
Expand Down Expand Up @@ -415,7 +399,7 @@ enum Ready {

// Wait for the socket to be readable or the timeout to fire.
async fn ready(
socket: &UdpSocket,
socket: &udp::Socket,
mut timeout: Option<&mut Pin<Box<Sleep>>>,
) -> Result<Ready, io::Error> {
let socket_ready = Box::pin(socket.readable()).map_ok(|()| Ready::Socket);
Expand All @@ -426,43 +410,6 @@ async fn ready(
select(socket_ready, timeout_ready).await.factor_first().0
}

fn read_dgram(
socket: &UdpSocket,
local_address: &SocketAddr,
) -> Result<Option<Datagram>, io::Error> {
let buf = &mut [0u8; 2048];
let (sz, remote_addr) = match socket.try_recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(None)
}
Err(err) => {
eprintln!("UDP recv error: {err:?}");
return Err(err);
}
Ok(res) => res,
};

if sz == buf.len() {
eprintln!("Might have received more than {} bytes", buf.len());
}

if sz == 0 {
eprintln!("zero length datagram received?");
Ok(None)
} else {
Ok(Some(Datagram::new(
remote_addr,
*local_address,
IpTos::default(),
None,
&buf[..sz],
)))
}
}

trait StreamHandler {
fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec<Header>);
fn process_data_readable(
Expand Down Expand Up @@ -817,7 +764,7 @@ fn to_headers(values: &[impl AsRef<str>]) -> Vec<Header> {

struct ClientRunner<'a> {
local_addr: SocketAddr,
socket: &'a UdpSocket,
socket: &'a udp::Socket,
client: Http3Client,
handler: Handler<'a>,
timeout: Option<Pin<Box<Sleep>>>,
Expand All @@ -827,7 +774,7 @@ struct ClientRunner<'a> {
impl<'a> ClientRunner<'a> {
fn new(
args: &'a mut Args,
socket: &'a UdpSocket,
socket: &'a udp::Socket,
local_addr: SocketAddr,
remote_addr: SocketAddr,
hostname: &str,
Expand Down Expand Up @@ -880,7 +827,7 @@ impl<'a> ClientRunner<'a> {

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = read_dgram(self.socket, &self.local_addr)?;
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
break;
}
Expand Down Expand Up @@ -915,7 +862,8 @@ impl<'a> ClientRunner<'a> {
loop {
match self.client.process(dgram.take(), Instant::now()) {
Output::Datagram(dgram) => {
emit_datagram(self.socket, dgram).await?;
self.socket.writable().await?;
self.socket.send(dgram)?;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
Expand Down Expand Up @@ -1051,16 +999,7 @@ async fn main() -> Res<()> {
SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0),
};

let socket = match std::net::UdpSocket::bind(local_addr) {
Err(e) => {
eprintln!("Unable to bind UDP socket: {e}");
exit(1)
}
Ok(s) => s,
};
socket.set_nonblocking(true)?;
let socket = UdpSocket::from_std(socket)?;

let socket = udp::Socket::bind(local_addr)?;
let real_local = socket.local_addr().unwrap();
println!(
"{} Client connecting: {:?} -> {:?}",
Expand Down Expand Up @@ -1125,17 +1064,16 @@ mod old {
time::Instant,
};

use neqo_common::{event::Provider, qdebug, qinfo, Datagram};
use neqo_common::{event::Provider, qdebug, qinfo, udp, Datagram};
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
use neqo_transport::{
Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId,
StreamType,
};
use tokio::{net::UdpSocket, time::Sleep};
use tokio::time::Sleep;
use url::Url;

use super::{get_output_file, qlog_new, read_dgram, ready, Args, KeyUpdateState, Ready, Res};
use crate::emit_datagram;
use super::{get_output_file, qlog_new, ready, Args, KeyUpdateState, Ready, Res};

struct HandlerOld<'b> {
streams: HashMap<StreamId, Option<File>>,
Expand Down Expand Up @@ -1321,7 +1259,7 @@ mod old {

pub struct ClientRunner<'a> {
local_addr: SocketAddr,
socket: &'a UdpSocket,
socket: &'a udp::Socket,
client: Connection,
handler: HandlerOld<'a>,
timeout: Option<Pin<Box<Sleep>>>,
Expand All @@ -1331,7 +1269,7 @@ mod old {
impl<'a> ClientRunner<'a> {
pub fn new(
args: &'a Args,
socket: &'a UdpSocket,
socket: &'a udp::Socket,
local_addr: SocketAddr,
remote_addr: SocketAddr,
origin: &str,
Expand Down Expand Up @@ -1394,7 +1332,7 @@ mod old {

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = read_dgram(self.socket, &self.local_addr)?;
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
break;
}
Expand Down Expand Up @@ -1430,7 +1368,8 @@ mod old {
loop {
match self.client.process(dgram.take(), Instant::now()) {
Output::Datagram(dgram) => {
emit_datagram(self.socket, dgram).await?;
self.socket.writable().await?;
self.socket.send(dgram)?;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
Expand Down
3 changes: 3 additions & 0 deletions neqo-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ enum-map = "2.7"
env_logger = { version = "0.10", default-features = false }
log = { version = "0.4", default-features = false }
qlog = "0.12"
quinn-udp = { git = "https://github.com/quinn-rs/quinn/", rev = "a947962131aba8a6521253d03cc948b20098a2d6", optional = true }
time = { version = "0.3", features = ["formatting"] }
tokio = { version = "1", features = ["net", "time", "macros", "rt", "rt-multi-thread"], optional = true }

[dev-dependencies]
test-fixture = { path = "../test-fixture" }

[features]
ci = []
udp = ["dep:quinn-udp", "dep:tokio"]

[target."cfg(windows)".dependencies.winapi]
version = "0.3"
Expand Down
5 changes: 5 additions & 0 deletions neqo-common/src/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ impl Datagram {
pub fn ttl(&self) -> Option<u8> {
self.ttl
}

#[must_use]
pub(crate) fn into_data(self) -> Vec<u8> {
self.d
}
Comment on lines +57 to +60
Copy link
Collaborator Author

@mxinden mxinden Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to convert a Datagram to a Bytes without allocation. Used within neqo_common::udp, i.e. not exposed beyond the crate.

}

impl Deref for Datagram {
Expand Down
2 changes: 2 additions & 0 deletions neqo-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub mod log;
pub mod qlog;
pub mod timer;
pub mod tos;
#[cfg(feature = "udp")]
pub mod udp;
Comment on lines +20 to +21
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term, especially since neqo_common::udp should never land in Firefox, I would prefer combining neqo-client, neqo-server and neqo_common::udp into one crate.

That said, I suggest doing so in a separate pull request. Thoughts?

Copy link
Collaborator

@larseggert larseggert Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Also check if neqo-interop should be part of that refactor?


use std::fmt::Write;

Expand Down
35 changes: 35 additions & 0 deletions neqo-common/src/tos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
}
}

impl From<IpTos> for IpTosEcn {
fn from(value: IpTos) -> Self {
IpTosEcn::from(value.0 & 0x3)
}
}

/// Diffserv Codepoints, mapped to the upper six bits of the TOS field.
/// <https://www.iana.org/assignments/dscp-registry/dscp-registry.xhtml>
#[derive(Copy, Clone, PartialEq, Eq, Enum, Default, Debug)]
Expand Down Expand Up @@ -159,6 +165,12 @@
}
}

impl From<IpTos> for IpTosDscp {
fn from(value: IpTos) -> Self {
IpTosDscp::from(value.0 & 0xfc)
}

Check warning on line 171 in neqo-common/src/tos.rs

View check run for this annotation

Codecov / codecov/patch

neqo-common/src/tos.rs#L169-L171

Added lines #L169 - L171 were not covered by tests
}

/// The type-of-service field in an IP packet.
#[allow(clippy::module_name_repetitions)]
#[derive(Copy, Clone, PartialEq, Eq)]
Expand All @@ -169,22 +181,37 @@
Self(u8::from(v))
}
}

impl From<IpTosDscp> for IpTos {
fn from(v: IpTosDscp) -> Self {
Self(u8::from(v))
}
}

impl From<(IpTosDscp, IpTosEcn)> for IpTos {
fn from(v: (IpTosDscp, IpTosEcn)) -> Self {
Self(u8::from(v.0) | u8::from(v.1))
}
}

impl From<(IpTosEcn, IpTosDscp)> for IpTos {
fn from(v: (IpTosEcn, IpTosDscp)) -> Self {
Self(u8::from(v.0) | u8::from(v.1))
}
}

impl From<IpTos> for u8 {
fn from(v: IpTos) -> Self {
v.0
}
}

impl From<u8> for IpTos {
fn from(v: u8) -> Self {
Self(v)
}
}

impl Debug for IpTos {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("IpTos")
Expand Down Expand Up @@ -287,4 +314,12 @@
let iptos_dscp: IpTos = dscp.into();
assert_eq!(u8::from(iptos_dscp), dscp as u8);
}

#[test]
fn u8_to_iptos() {
let tos = 0x8b;
let iptos: IpTos = (IpTosEcn::Ce, IpTosDscp::Af41).into();
assert_eq!(tos, u8::from(iptos));
assert_eq!(IpTos::from(tos), iptos);
}
}
Loading