@@ -1027,26 +1027,38 @@ var defaultUpgrader = &websocket.Upgrader{
1027
1027
Subprotocols : []string {"mqtt" },
1028
1028
}
1029
1029
1030
- //实现io.ReadWriter接口
1030
+ // 实现io.ReadWriter接口
1031
1031
// wsConn implements the io.readWriter
1032
1032
type wsConn struct {
1033
1033
net.Conn
1034
- c * websocket.Conn
1034
+ c * websocket.Conn
1035
+ buf []byte
1036
+ r int // buf copy positions
1035
1037
}
1036
1038
1037
1039
func (ws * wsConn ) Close () error {
1038
1040
return ws .Conn .Close ()
1039
1041
}
1040
1042
1041
1043
func (ws * wsConn ) Read (p []byte ) (n int , err error ) {
1042
- msgType , r , err := ws .c .NextReader ()
1043
- if err != nil {
1044
- return 0 , err
1044
+ if ws .buf == nil {
1045
+ msgType , buf , err := ws .c .ReadMessage ()
1046
+ if err != nil {
1047
+ return 0 , err
1048
+ }
1049
+ if msgType != websocket .BinaryMessage {
1050
+ return 0 , ErrInvalWsMsgType
1051
+ }
1052
+ ws .buf = buf
1045
1053
}
1046
- if msgType != websocket .BinaryMessage {
1047
- return 0 , ErrInvalWsMsgType
1054
+ n = copy (p , ws .buf [ws .r :])
1055
+ ws .r += n
1056
+ // reset reader buffer
1057
+ if ws .r + 1 >= len (ws .buf ) {
1058
+ ws .buf = nil
1059
+ ws .r = 0
1048
1060
}
1049
- return r . Read ( p )
1061
+ return
1050
1062
}
1051
1063
1052
1064
func (ws * wsConn ) Write (p []byte ) (n int , err error ) {
@@ -1363,7 +1375,7 @@ func (srv *server) wsHandler() http.HandlerFunc {
1363
1375
return
1364
1376
}
1365
1377
defer c .Close ()
1366
- conn := & wsConn {c .UnderlyingConn (), c }
1378
+ conn := & wsConn {Conn : c .UnderlyingConn (), c : c }
1367
1379
client , err := srv .newClient (conn )
1368
1380
if err != nil {
1369
1381
zaplog .Error ("new client fail" , zap .Error (err ))
0 commit comments