Skip to content

Commit 71ddf40

Browse files
authored
feat: add websocket permessage-deflate (#559)
1 parent 8afd772 commit 71ddf40

6 files changed

+437
-33
lines changed

include/cinatra/coro_http_client.hpp

+131-23
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
#include "async_simple/Unit.h"
2222
#include "async_simple/coro/FutureAwaiter.h"
2323
#include "async_simple/coro/Lazy.h"
24+
#ifdef CINATRA_ENABLE_GZIP
25+
#include "gzip.hpp"
26+
#endif
2427
#include "cinatra_log_wrapper.hpp"
2528
#include "http_parser.hpp"
2629
#include "multipart.hpp"
@@ -273,6 +276,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
273276
return std::move(body_);
274277
}
275278

279+
#ifdef CINATRA_ENABLE_GZIP
280+
void set_ws_deflate(bool enable_ws_deflate) {
281+
enable_ws_deflate_ = enable_ws_deflate;
282+
}
283+
#endif
284+
276285
// only make socket connet(or handshake) to the host
277286
async_simple::coro::Lazy<resp_data> connect(std::string uri) {
278287
resp_data data{};
@@ -298,10 +307,30 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
298307
}
299308
add_header("Sec-WebSocket-Key", ws_sec_key_);
300309
add_header("Sec-WebSocket-Version", "13");
301-
310+
#ifdef CINATRA_ENABLE_GZIP
311+
if (enable_ws_deflate_)
312+
add_header("Sec-WebSocket-Extensions",
313+
"permessage-deflate; client_max_window_bits");
314+
#endif
302315
req_context<> ctx{};
303316
data = co_await async_request(std::move(uri), http_method::GET,
304317
std::move(ctx));
318+
319+
#ifdef CINATRA_ENABLE_GZIP
320+
if (enable_ws_deflate_) {
321+
for (auto c : data.resp_headers) {
322+
if (c.name == "Sec-WebSocket-Extensions") {
323+
if (c.value.find("permessage-deflate;") != std::string::npos) {
324+
is_server_support_ws_deflate_ = true;
325+
}
326+
else {
327+
is_server_support_ws_deflate_ = false;
328+
}
329+
break;
330+
}
331+
}
332+
}
333+
#endif
305334
co_return data;
306335
}
307336
data = co_await connect(u);
@@ -382,37 +411,91 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
382411
}
383412

384413
if constexpr (is_span_v<Source>) {
385-
std::string encode_header = ws.encode_frame(source, op, true);
386-
std::vector<asio::const_buffer> buffers{
387-
asio::buffer(encode_header.data(), encode_header.size()),
388-
asio::buffer(source.data(), source.size())};
389-
390-
auto [ec, _] = co_await async_write(buffers);
391-
if (ec) {
392-
data.net_err = ec;
393-
data.status = 404;
414+
#ifdef CINATRA_ENABLE_GZIP
415+
if (enable_ws_deflate_ && is_server_support_ws_deflate_) {
416+
std::string dest_buf;
417+
if (cinatra::gzip_codec::deflate({source.data(), source.size()},
418+
dest_buf)) {
419+
std::span<char> msg(dest_buf.data(), dest_buf.size());
420+
auto header = ws.encode_frame(msg, op, true, true);
421+
std::vector<asio::const_buffer> buffers{asio::buffer(header),
422+
asio::buffer(dest_buf)};
423+
424+
auto [ec, sz] = co_await async_write(buffers);
425+
if (ec) {
426+
data.net_err = ec;
427+
data.status = 404;
428+
}
429+
}
430+
else {
431+
CINATRA_LOG_ERROR << "compuress data error, data: "
432+
<< std::string(source.begin(), source.end());
433+
data.net_err = std::make_error_code(std::errc::protocol_error);
434+
data.status = 404;
435+
}
394436
}
395-
}
396-
else {
397-
while (true) {
398-
auto result = co_await source();
399-
400-
std::span<char> msg(result.buf.data(), result.buf.size());
401-
std::string encode_header = ws.encode_frame(msg, op, result.eof);
437+
else {
438+
#endif
439+
std::string encode_header = ws.encode_frame(source, op, true);
402440
std::vector<asio::const_buffer> buffers{
403441
asio::buffer(encode_header.data(), encode_header.size()),
404-
asio::buffer(msg.data(), msg.size())};
442+
asio::buffer(source.data(), source.size())};
405443

406444
auto [ec, _] = co_await async_write(buffers);
407445
if (ec) {
408446
data.net_err = ec;
409447
data.status = 404;
410-
break;
411448
}
449+
#ifdef CINATRA_ENABLE_GZIP
450+
}
451+
#endif
452+
}
453+
else {
454+
while (true) {
455+
auto result = co_await source();
456+
#ifdef CINATRA_ENABLE_GZIP
457+
if (enable_ws_deflate_ && is_server_support_ws_deflate_) {
458+
std::string dest_buf;
459+
if (cinatra::gzip_codec::deflate(
460+
{result.buf.data(), result.buf.size()}, dest_buf)) {
461+
std::span<char> msg(dest_buf.data(), dest_buf.size());
462+
std::string header = ws.encode_frame(msg, op, result.eof, true);
463+
std::vector<asio::const_buffer> buffers{asio::buffer(header),
464+
asio::buffer(dest_buf)};
465+
auto [ec, sz] = co_await async_write(buffers);
466+
if (ec) {
467+
data.net_err = ec;
468+
data.status = 404;
469+
}
470+
}
471+
else {
472+
CINATRA_LOG_ERROR << "compuress data error, data: "
473+
<< std::string(result.buf.data());
474+
data.net_err = std::make_error_code(std::errc::protocol_error);
475+
data.status = 404;
476+
}
477+
}
478+
else {
479+
#endif
480+
std::span<char> msg(result.buf.data(), result.buf.size());
481+
std::string encode_header = ws.encode_frame(msg, op, result.eof);
482+
std::vector<asio::const_buffer> buffers{
483+
asio::buffer(encode_header.data(), encode_header.size()),
484+
asio::buffer(msg.data(), msg.size())};
412485

413-
if (result.eof) {
414-
break;
486+
auto [ec, _] = co_await async_write(buffers);
487+
if (ec) {
488+
data.net_err = ec;
489+
data.status = 404;
490+
break;
491+
}
492+
493+
if (result.eof) {
494+
break;
495+
}
496+
#ifdef CINATRA_ENABLE_GZIP
415497
}
498+
#endif
416499
}
417500
}
418501

@@ -1839,9 +1922,28 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
18391922
}
18401923
}
18411924

1842-
data.status = 200;
1843-
data.resp_body = {data_ptr, payload_len};
1925+
#ifdef CINATRA_ENABLE_GZIP
1926+
if (!is_close_frame && is_server_support_ws_deflate_ &&
1927+
enable_ws_deflate_) {
1928+
inflate_str_.clear();
1929+
if (!cinatra::gzip_codec::inflate({data_ptr, payload_len},
1930+
inflate_str_)) {
1931+
CINATRA_LOG_ERROR << "uncompuress data error";
1932+
data.status = 404;
1933+
data.net_err = std::make_error_code(std::errc::protocol_error);
1934+
co_return data;
1935+
}
1936+
data.status = 200;
1937+
data.resp_body = {inflate_str_.data(), inflate_str_.size()};
1938+
}
1939+
else {
1940+
#endif
18441941

1942+
data.status = 200;
1943+
data.resp_body = {data_ptr, payload_len};
1944+
#ifdef CINATRA_ENABLE_GZIP
1945+
}
1946+
#endif
18451947
read_buf.consume(read_buf.size());
18461948
header_size = 2;
18471949

@@ -2024,6 +2126,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
20242126
std::string resp_chunk_str_;
20252127
std::span<char> out_buf_;
20262128

2129+
#ifdef CINATRA_ENABLE_GZIP
2130+
bool enable_ws_deflate_ = false;
2131+
bool is_server_support_ws_deflate_ = false;
2132+
std::string inflate_str_;
2133+
#endif
2134+
20272135
#ifdef BENCHMARK_TEST
20282136
std::string req_str_;
20292137
bool stop_bench_ = false;

include/cinatra/coro_http_connection.hpp

+63-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
#include "sha1.hpp"
2222
#include "string_resize.hpp"
2323
#include "websocket.hpp"
24+
#ifdef CINATRA_ENABLE_GZIP
25+
#include "gzip.hpp"
26+
#endif
2427
#include "ylt/coro_io/coro_file.hpp"
2528
#include "ylt/coro_io/coro_io.hpp"
2629

@@ -132,6 +135,14 @@ class coro_http_connection
132135
if (body_len == 0) {
133136
if (parser_.method() == "GET"sv) {
134137
if (request_.is_upgrade()) {
138+
#ifdef CINATRA_ENABLE_GZIP
139+
if (request_.is_support_compressed()) {
140+
is_client_ws_compressed_ = true;
141+
}
142+
else {
143+
is_client_ws_compressed_ = false;
144+
}
145+
#endif
135146
// websocket
136147
build_ws_handshake_head();
137148
bool ok = co_await reply(true); // response ws handshake
@@ -562,11 +573,28 @@ class coro_http_connection
562573

563574
async_simple::coro::Lazy<std::error_code> write_websocket(
564575
std::string_view msg, opcode op = opcode::text) {
565-
auto header = ws_.format_header(msg.length(), op);
566576
std::vector<asio::const_buffer> buffers;
567-
buffers.push_back(asio::buffer(header));
568-
buffers.push_back(asio::buffer(msg));
577+
std::string header;
578+
#ifdef CINATRA_ENABLE_GZIP
579+
std::string dest_buf;
580+
if (is_client_ws_compressed_ && msg.size() > 0) {
581+
if (!cinatra::gzip_codec::deflate(msg, dest_buf)) {
582+
CINATRA_LOG_ERROR << "compuress data error, data: " << msg;
583+
co_return std::make_error_code(std::errc::protocol_error);
584+
}
569585

586+
header = ws_.format_header(dest_buf.length(), op, true);
587+
buffers.push_back(asio::buffer(header));
588+
buffers.push_back(asio::buffer(dest_buf));
589+
}
590+
else {
591+
#endif
592+
header = ws_.format_header(msg.length(), op);
593+
buffers.push_back(asio::buffer(header));
594+
buffers.push_back(asio::buffer(msg));
595+
#ifdef CINATRA_ENABLE_GZIP
596+
}
597+
#endif
570598
auto [ec, sz] = co_await async_write(buffers);
571599
co_return ec;
572600
}
@@ -623,8 +651,27 @@ class coro_http_connection
623651
break;
624652
case cinatra::ws_frame_type::WS_TEXT_FRAME:
625653
case cinatra::ws_frame_type::WS_BINARY_FRAME: {
626-
result.eof = true;
627-
result.data = {payload.data(), payload.size()};
654+
#ifdef CINATRA_ENABLE_GZIP
655+
if (is_client_ws_compressed_) {
656+
inflate_str_.clear();
657+
if (!cinatra::gzip_codec::inflate(
658+
{payload.data(), payload.size()}, inflate_str_)) {
659+
CINATRA_LOG_ERROR << "uncompuress data error";
660+
result.ec = std::make_error_code(std::errc::protocol_error);
661+
break;
662+
}
663+
result.eof = true;
664+
result.data = {inflate_str_.data(), inflate_str_.size()};
665+
break;
666+
}
667+
else {
668+
#endif
669+
result.eof = true;
670+
result.data = {payload.data(), payload.size()};
671+
break;
672+
#ifdef CINATRA_ENABLE_GZIP
673+
}
674+
#endif
628675
} break;
629676
case cinatra::ws_frame_type::WS_CLOSE_FRAME: {
630677
close_frame close_frame =
@@ -811,6 +858,12 @@ class coro_http_connection
811858
response_.add_header("Connection", "Upgrade");
812859
response_.add_header("Sec-WebSocket-Accept", std::string(accept_key, 28));
813860
auto protocal_str = request_.get_header_value("sec-websocket-protocol");
861+
#ifdef CINATRA_ENABLE_GZIP
862+
if (is_client_ws_compressed_) {
863+
response_.add_header("Sec-WebSocket-Extensions",
864+
"permessage-deflate; client_no_context_takeover");
865+
}
866+
#endif
814867
if (!protocal_str.empty()) {
815868
response_.add_header("Sec-WebSocket-Protocol", std::string(protocal_str));
816869
}
@@ -837,6 +890,11 @@ class coro_http_connection
837890
uint64_t max_part_size_ = 8 * 1024 * 1024;
838891
std::string resp_str_;
839892

893+
#ifdef CINATRA_ENABLE_GZIP
894+
bool is_client_ws_compressed_ = false;
895+
std::string inflate_str_;
896+
#endif
897+
840898
websocket ws_;
841899
#ifdef CINATRA_ENABLE_SSL
842900
std::unique_ptr<asio::ssl::context> ssl_ctx_ = nullptr;

include/cinatra/coro_http_request.hpp

+8
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,14 @@ class coro_http_request {
208208
return true;
209209
}
210210

211+
bool is_support_compressed() {
212+
auto extension_str = get_header_value("Sec-WebSocket-Extensions");
213+
if (extension_str.find("permessage-deflate") != std::string::npos) {
214+
return true;
215+
}
216+
return false;
217+
}
218+
211219
void set_aspect_data(std::string data) {
212220
aspect_data_.push_back(std::move(data));
213221
}

0 commit comments

Comments
 (0)