Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

空data支持 #37

Open
TheCGDF opened this issue Jun 30, 2024 · 38 comments
Open

空data支持 #37

TheCGDF opened this issue Jun 30, 2024 · 38 comments

Comments

@TheCGDF
Copy link

TheCGDF commented Jun 30, 2024

当客户端发送一个空的数据包时:

let mut buffer = [0u8; 0];
...
stream.write_all(&buffer).await.unwrap();

服务端的read会卡住,或者说,忽略这个空数据包

while let Ok(n) = stream.read(&mut buffer).await {
...
}

这导致一些兼容性问题。

以及某些kcp库的client在connect时会发送一个空数据包作为握手。

是否有办法接受空数据包?

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 2, 2024

read() 返回 0 表示 EOF ,所以不能实现。
如果要实现握手,那应该在KCP Session层实现,即使是空数据包也可以发一个完整的KCP Segment,接收端也可以发一个ACK

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 2, 2024

tokio_kcp/src/skcp.rs

Lines 138 to 185 in bf3466b

pub fn poll_send(&mut self, cx: &mut Context<'_>, mut buf: &[u8]) -> Poll<KcpResult<usize>> {
if self.closed {
return Err(io::Error::from(ErrorKind::BrokenPipe).into()).into();
}
// If:
// 1. Have sent the first packet (asking for conv)
// 2. Too many pending packets
if self.sent_first
&& (self.kcp.wait_snd() >= self.kcp.snd_wnd() as usize
|| self.kcp.wait_snd() >= self.kcp.rmt_wnd() as usize
|| self.kcp.waiting_conv())
{
trace!(
"[SEND] waitsnd={} sndwnd={} rmtwnd={} excceeded or waiting conv={}",
self.kcp.wait_snd(),
self.kcp.snd_wnd(),
self.kcp.rmt_wnd(),
self.kcp.waiting_conv()
);
if let Some(waker) = self.pending_sender.replace(cx.waker().clone()) {
if !cx.waker().will_wake(&waker) {
waker.wake();
}
}
return Poll::Pending;
}
if !self.sent_first && self.kcp.waiting_conv() && buf.len() > self.kcp.mss() {
buf = &buf[..self.kcp.mss()];
}
let n = self.kcp.send(buf)?;
self.sent_first = true;
if self.kcp.wait_snd() >= self.kcp.snd_wnd() as usize || self.kcp.wait_snd() >= self.kcp.rmt_wnd() as usize {
self.kcp.flush()?;
}
self.last_update = Instant::now();
if self.flush_write {
self.kcp.flush()?;
}
Ok(n).into()
}

从逻辑上看没有不允许发送 len() = 0 的数据,你是否是启用了 stream 模式?

@TheCGDF
Copy link
Author

TheCGDF commented Jul 2, 2024

从逻辑上看没有不允许发送 len() = 0 的数据,你是否是启用了 stream 模式?

没有开启stream

我用wireshark抓包试了,当send buffer为空时,tokio_kcp不会发出任何udp包。

当用其他kcp库向tokio_kcp发一个buffer为空的udp包时,stream.read不会返回

@TheCGDF
Copy link
Author

TheCGDF commented Jul 2, 2024

我看到作为依赖的kcp库中有
https://github.com/Matrix-Zhang/kcp/blob/58e863fcbdf4cdd0df9e9c378a864dd4d0c8f58f/src/kcp.rs#L505-L507
这样的逻辑,是否和这个有关(我不太确定)

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 3, 2024

这段代码仅在 stream = true 时生效,所以问你是否有开启 stream

@TheCGDF
Copy link
Author

TheCGDF commented Jul 3, 2024

真没开。
KcpConfig::default()默认是关闭的,我也试了下面这样的显式指定:

    let config =KcpConfig {
        mtu: 1400,
        nodelay: KcpNoDelayConfig {
            nodelay: false,
            interval: 40,
            resend: 0,
            nc: false,
        },
        wnd_size: (256, 256),
        session_expire: std::time::Duration::from_secs(90),
        flush_write: true,
        flush_acks_input: true,
        stream: false,
    };

nctrue/false也都试了没影响。

@TheCGDF
Copy link
Author

TheCGDF commented Jul 3, 2024

https://github.com/Matrix-Zhang/kcp/blob/58e863fcbdf4cdd0df9e9c378a864dd4d0c8f58f/src/kcp.rs#L1246-L1249
这里还有一处判断,可能是这里?我打断点调试看到这里被跳过了

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 3, 2024

https://github.com/Matrix-Zhang/kcp/blob/58e863fcbdf4cdd0df9e9c378a864dd4d0c8f58f/src/kcp.rs#L521-L539

至少会往 self.snd_queue 里面放 1 个 segment ,按理说不会被跳过

@TheCGDF
Copy link
Author

TheCGDF commented Jul 3, 2024

https://github.com/Matrix-Zhang/kcp/blob/58e863fcbdf4cdd0df9e9c378a864dd4d0c8f58f/src/kcp.rs#L521-L539

至少会往 self.snd_queue 里面放 1 个 segment ,按理说不会被跳过

打断点看了,空数组的时候走不到那段代码,poll_sendsend都不会被调用,但是数组里有元素的时候这两个函数就会被调用。但是我找不到哪个分支判断处理的

@TheCGDF
Copy link
Author

TheCGDF commented Jul 3, 2024

翻了tokio源码终于找到了

https://github.com/tokio-rs/tokio/blob/c8f3539bc11e57843745c68ee60ca5276248f9f9/tokio/src/io/util/write_all.rs#L42-L51

数组为空时不会调用poll_write,断点调试也验证了确实是这一个判断的区别

那我估计这是难修了,除非多一层封装先把 segment 放进去再调用poll

但是read那边也不支持空数组,还是有些头疼,难道也要连着 segment 读出来?

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 4, 2024

read() 返回 0 是肯定不可以的,返回 0 表示的是 EOF ,不能去破坏这个约定 。要么就去另外搞一个函数去读,不用 AsyncRead trait

@TheCGDF
Copy link
Author

TheCGDF commented Jul 4, 2024

read() 返回 0 是肯定不可以的,返回 0 表示的是 EOF ,不能去破坏这个约定 。要么就去另外搞一个函数去读,不用 AsyncRead trait

那目前的实现如果不打算改的话还是在文档或者example里标记一下比较好。。。

比如len() == 0视为UB什么的,毕竟也算是一个不兼容的特性

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 4, 2024

不需要的,因为Rust所有的API都是类POSIX标准,read() = 0 就是 EOF ,不需要特意说明

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 4, 2024

解决办法是加个option,如果设置了的话,write 允许写空包,那不要用 write_all 就好了,用 write() 是没有这个判断的

tokio_kcp/src/stream.rs

Lines 165 to 171 in bf3466b

fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
match ready!(self.poll_send(cx, buf)) {
Ok(n) => Ok(n).into(),
Err(KcpError::IoError(err)) => Err(err).into(),
Err(err) => Err(io::Error::new(ErrorKind::Other, err)).into(),
}
}

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 4, 2024

不过你的业务层代码也直接用 write() 不就好了?不要用 write_all()

@TheCGDF
Copy link
Author

TheCGDF commented Jul 4, 2024

还是有个比较奇怪的点,tokio的UdpSocket用的是send,而不是write,我试了下send可以发空包:

let u = UdpSocket::bind("127.0.0.1:6666".parse::<SocketAddr>().unwrap()).await.unwrap();
u.connect("127.0.0.1:5555").await.unwrap();
u.send(&[0u8;0]).await.unwrap();

也就是说socket可能应该用send/recv,而且发空包本身是一个被支持的操作。

于是我想着tokio_tcp是不是也可以用send,但是发现tokio_tcp的send无论数组是否为空,都不会发出任何包。

网络库的API应该用UdpSocketsend/recv那套吧?用文件io的read/write这套应该不太合理?

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 6, 2024

KCP协议是一种流协议,因此与它最相似的就是 TcpStream ,因此设计了 KcpStream 。相类似的就是 TcpStream 实现了 AsyncReadAsyncWriteKcpStream 也实现了相同的接口。

底层用 UdpSocket 只是一般都这么用,实际上并没有规定一定要用 UDP 。目前我在尝试把底层实现交给用户来自定义,不一定要用 UdpSocket

对于这里提到的问题,是一种基于 KCP 实现的一种特例,它在 stream=false 时允许使用类似 Datagram 协议的方式来发包,但包的顺序又是固定的(与UDP完全不同)。因此我上面建议的是另外做两个接口 sendrecv 来支持这样的特性(仅 stream=false 时有意义)

@TheCGDF
Copy link
Author

TheCGDF commented Jul 6, 2024

KCP协议是一种流协议,因此与它最相似的就是 TcpStream

试了下TcpStream在空包上处理方式确实和UDPSocket有些不一样:

let mut t = TcpStream::connect("127.0.0.1:6666").await.unwrap();
t.write_all(&[0u8;0]).await.unwrap();

TcpStream在进入write_all之后也跳过了空包的处理,然后TcpStream被析构的时候会发一个EOF包(KcpStream好像没有这个行为?),

从而服务端TcpListener那边在TcpStream生命周期结束后才会read出来一个len() ==0的空数组(显然太迟了)

let bytes_read = stream.read(&mut buffer).unwrap();

我也觉得send/recv方案可能好些,或者加个KcpSocket(对应UdpSocket)来区分流模式与非流模式。。。

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 6, 2024

  1. TcpStream 在析构时没有 “发出一个 EOF 包”,请了解 TCP 协议原理。
  2. KcpStream 没有这个行为,是因为 KCP 协议没有 FIN 和 RST

@TheCGDF
Copy link
Author

TheCGDF commented Jul 6, 2024

学到了。🙏

@zonyitoo
Copy link
Collaborator

当前就有 send

tokio_kcp/src/stream.rs

Lines 84 to 87 in bf3466b

/// `send` data in `buf`
pub async fn send(&mut self, buf: &[u8]) -> KcpResult<usize> {
future::poll_fn(|cx| self.poll_send(cx, buf)).await
}

recv ,可以直接用

tokio_kcp/src/stream.rs

Lines 140 to 143 in bf3466b

/// `recv` data into `buf`
pub async fn recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
future::poll_fn(|cx| self.poll_recv(cx, buf)).await
}

@TheCGDF
Copy link
Author

TheCGDF commented Jul 10, 2024

当前就有 send

上面我说过了。。。tokio_tcp的send无论数组是否为空,都不会发出任何包。

@zonyitoo
Copy link
Collaborator

你是说 write_all 吧,send 是单独的函数

@TheCGDF
Copy link
Author

TheCGDF commented Jul 10, 2024

你是说 write_all 吧,send 是单独的函数

重新测了下,之前测错了,应该是send只有长度为0时会发出包。

    let config = KcpConfig::default();
    let server_addr = "127.0.0.1:5555".parse::<SocketAddr>().unwrap();
    let mut stream = KcpStream::connect(&config, server_addr).await.unwrap();
    stream.send(&[0u8; 10]).await.unwrap();

这里的[0u8; 10]改成[0u8; 0]就能发出包,但是改成1或者其他什么数字就发不出去。

不知是我代码哪里写错了还是什么问题?

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 10, 2024

在代码中加了一个这样的 test ,没有问题

#[cfg(test)]
mod test {
    use crate::KcpListener;

    use super::*;

    #[tokio::test]
    async fn test_stream_echo() {
        let config = KcpConfig::default();
        let server_addr = "127.0.0.1:5555".parse::<SocketAddr>().unwrap();

        let mut listener = KcpListener::bind(config.clone(), server_addr).await.unwrap();
        let listener_hdl = tokio::spawn(async move {
            loop {
                let (mut stream, peer_addr) = listener.accept().await.unwrap();
                println!("accepted {}", peer_addr);

                tokio::spawn(async move {
                    let mut buffer = [0u8; 8192];
                    loop {
                        match stream.recv(&mut buffer).await {
                            Ok(n) => {
                                println!("server recv: {:?}", &buffer[..n]);
                                stream.send(&buffer[..n]).await.unwrap();
                                println!("server sent: {:?}", &buffer[..n]);
                            }
                            Err(err) => {
                                println!("recv error: {}", err);
                                break;
                            }
                        }
                    }
                });
            }
        });

        let mut stream = KcpStream::connect(&config, server_addr).await.unwrap();

        let test_payload = b"HELLO WORLD";
        stream.send(test_payload).await.unwrap();
        println!("client sent: {:?}", test_payload);

        let mut recv_buffer = [0u8; 1024];
        let recv_n = stream.recv(&mut recv_buffer).await.unwrap();
        println!("client recv: {:?}", &recv_buffer[..recv_n]);
        assert_eq!(recv_n, test_payload.len());
        assert_eq!(&recv_buffer[..recv_n], test_payload);

        listener_hdl.abort();
    }
}

其中会输出

client sent: [72, 69, 76, 76, 79, 32, 87, 79, 82, 76, 68]
accepted 127.0.0.1:49853
server recv: [72, 69, 76, 76, 79, 32, 87, 79, 82, 76, 68]
server sent: [72, 69, 76, 76, 79, 32, 87, 79, 82, 76, 68]
client recv: [72, 69, 76, 76, 79, 32, 87, 79, 82, 76, 68]
test stream::test::test_stream_echo ... ok

应该是你测试的问题,你直接这么测,update() Task 都还没来得及跑一次就程序就已经退出了吧。反而写 [0u8; 0] 会有问题,服务端会 recv() 不出来

@zonyitoo
Copy link
Collaborator

recv() 读不出来是因为把 0 等同于 RecvQueueEmpty 了,这个 Fix 很简单

e @ (Ok(0) | Err(KcpError::RecvQueueEmpty) | Err(KcpError::ExpectingFragment)) => {

@TheCGDF
Copy link
Author

TheCGDF commented Jul 10, 2024

确实是我测试方法不对,加了

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

就能发出去了

这样看来就只剩recv()需要被修了🥲

@zonyitoo
Copy link
Collaborator

不过用发一个空的来表示一种特殊的功能,似乎并不是一种稳定靠谱的做法。但凡后面多发一个字节的数据过去,读出来就不是空的

@TheCGDF
Copy link
Author

TheCGDF commented Jul 10, 2024

不过用发一个空的来表示一种特殊的功能,似乎并不是一种稳定靠谱的做法。但凡后面多发一个字节的数据过去,读出来就不是空的

但毕竟kcp协议没有禁止。。。

还有个,当conv为0的时候,服务端会重新生成一个随机的conv,这个行为好像不是kcp标准的行为?

有次随机生成的conv正好为0,导致连不上服务器,调试了好久都没复现,翻了代码才发现tokio_kcp里面有这么个逻辑😂。

@zonyitoo
Copy link
Collaborator

本身也没规定conv要怎么生成

@TheCGDF
Copy link
Author

TheCGDF commented Jul 10, 2024

本身也没规定conv要怎么生成

主要是有这样的逻辑是不是在文档或者注释里标一下比较好。。。不然出了bug还挺难复现的。。。

@TheCGDF
Copy link
Author

TheCGDF commented Jul 19, 2024

recv在超时的时候也是返回一个n=0而不是error,接收方怎么判断是发送方发了空数据还是超时了呢?

@zonyitoo
Copy link
Collaborator

判断不了。本身读到size=0表示的就是EOF,session超时本身这里的设计是想表示为关闭,关闭就向上层返回EOF表示结束。你这里一定要读到一个size=0的数据,很难做。

@TheCGDF
Copy link
Author

TheCGDF commented Jul 19, 2024

判断不了。本身读到size=0表示的就是EOF,session超时本身这里的设计是想表示为关闭,关闭就向上层返回EOF表示结束。你这里一定要读到一个size=0的数据,很难做。

超时无法返回一个Timeout的Error吗?🥲

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 22, 2024

可以实现,更好的办法是不是应该让你设置成不超时,由你自己来处理

@TheCGDF
Copy link
Author

TheCGDF commented Jul 22, 2024

你是指像这样↓,自己设置一个last_received来记录上一次recv的时间,然后在tokio::select里自己判断和处理超时吗?

let mut last_received = Utc::now();
tokio::select! {
    _ = async {
        //最多超时两秒,否则发送队列会被塞爆,send会被阻塞
        let ms = (last_received + Duration::seconds(2) - Utc::now()).num_milliseconds();
        tokio::time::sleep(
            if ms <= 0 {
                std::time::Duration::ZERO
            } else {
                std::time::Duration::from_millis(ms as u64)
            }
        ).await;
    } =>{
        //session timeout
        break;
    }

    result = stream.recv(&mut buffer) => {
        last_received = Utc::now();
        //handle buffer
    }
}

我个人感觉如果能提供个超时的KcpResult还是挺有用的,这样后面如果有兴趣支持try_send的话也可以派得上用场。。。

在已有session_expire这一设定的前提下,还得send/recv前再多写一层超时检查的逻辑,总感觉有点丑陋。。。

@zonyitoo
Copy link
Collaborator

zonyitoo commented Jul 28, 2024

不用那么复杂,直接创建一个 Sleep 出来,每次如果收到了消息就把它 reset 一下就好了。

实际上可能更好的是:

loop {
    let n = match tokio::time::timeout(stream.recv(&mut buffer)) {
        Ok(Ok(n)) => n,
        Ok(Err(err)) => ... // socket error,
        Err(..) => ... // timedout
    };
}

这样写不是挺优雅的,没必要 select 。

我个人感觉如果能提供个超时的KcpResult还是挺有用的,这样后面如果有兴趣支持try_send的话也可以派得上用场。。。

研究了一下写起来可能会有点丑陋,tokio::time::timeout 的方案更优。因为实际上我想要规避的是如果业务代码没有主动去写超时,那么会直接爆内存。但只要业务代码写了,session 自动超时是没有必要的。

@zonyitoo zonyitoo reopened this Jul 28, 2024
@TheCGDF
Copy link
Author

TheCGDF commented Jul 28, 2024

这样写不是挺优雅的,没必要 select 。

我之前没说清楚,用select是因为还要从(联机)游戏的主世界channelreceiver接收主世界发来的逻辑帧。

主世界每15ms更新一次逻辑帧然后通过channel推送给世界里的所有玩家。

(这种模式应该是比较普遍的,我看一些其他issue里的代码也采用了这种模式,比如:#33

let mut last_received = Utc::now();
loop{
    tokio::select! {
        //接收主世界channel每隔15ms发来的逻辑帧
        recieved = world_receiver.recv() => {
            //将逻辑帧序列化为二进制
            if tokio::time::timeout(std::time::Duration::from_secs(2), stream.send(&response)).await.is_err(){
                //逻辑帧的send只能超时两秒
                break;
            }
        }
    
        _ = async {
            //recv最多等待两秒,否则发送队列会被塞爆,send会被阻塞
            let ms = (last_received + Duration::seconds(2) - Utc::now()).num_milliseconds();
            tokio::time::sleep(
                if ms <= 0 {
                    std::time::Duration::ZERO
                } else {
                    std::time::Duration::from_millis(ms as u64)
                }
            ).await;
        } =>{
            //session timeout
            break;
        }
    
        //这里如果使用tokio::time::timeout会永远触发不了timeout
        //因为会被主世界的逻辑帧抢先
        result = stream.recv(&mut buffer) => {
            last_received = Utc::now();
            //handle buffer
        }
    }
}

一共需要手动处理两个超时

  • 发送逻辑帧时send的超时
  • 接收客户端输入recv的超时

在这种情况下,给recv套上tokio::time::timeout的方案应该是用不了的,因为主世界的channel总是会在15ms内发来消息,recvtokio::time::timeout超时永远不会触发。所以我目前能想到的方案就是弄一个不会被select中其他分支影响的last_received变量记录最后一次收到消息的时间。

tokio_kcp这边如果要设置超时的KcpResult,我猜应该也会需要弄一个last_received变量记录最后一次收到消息的时间?

tokio::time::timeout应该只适用于比较简单的一问一答?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants