diff --git a/benches/bench/main.rs b/benches/bench/main.rs index ea66c99..b618ae3 100644 --- a/benches/bench/main.rs +++ b/benches/bench/main.rs @@ -222,6 +222,7 @@ fn config() -> Config { listen_address: SUBWAY_SERVER_ADDR.to_string(), port: SUBWAY_SERVER_PORT, max_connections: 1024 * 1024, + max_batch_size: None, request_timeout_seconds: 120, http_methods: Vec::new(), cors: None, diff --git a/configs/config.yml b/configs/config.yml index 2a062b4..d62a1b4 100644 --- a/configs/config.yml +++ b/configs/config.yml @@ -17,6 +17,7 @@ extensions: port: 9944 listen_address: '0.0.0.0' max_connections: 2000 + max_batch_size: 10 http_methods: - path: /health method: system_health diff --git a/configs/eth_config.yml b/configs/eth_config.yml index 3363d77..ed5f408 100644 --- a/configs/eth_config.yml +++ b/configs/eth_config.yml @@ -14,6 +14,7 @@ extensions: port: 8545 listen_address: '0.0.0.0' max_connections: 2000 + max_batch_size: 10 cors: all middlewares: diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index 862cee3..d284f42 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -4,8 +4,8 @@ use hyper::server::conn::AddrStream; use hyper::service::Service; use hyper::service::{make_service_fn, service_fn}; use jsonrpsee::server::{ - middleware::rpc::RpcServiceBuilder, stop_channel, ws, RandomStringIdProvider, RpcModule, ServerBuilder, - ServerHandle, + middleware::rpc::RpcServiceBuilder, stop_channel, ws, BatchRequestConfig, RandomStringIdProvider, RpcModule, + ServerBuilder, ServerHandle, }; use jsonrpsee::Methods; @@ -61,6 +61,7 @@ pub struct ServerConfig { pub port: u16, pub listen_address: String, pub max_connections: u32, + pub max_batch_size: Option, #[serde(default)] pub http_methods: Vec, #[serde(default = "default_request_timeout_seconds")] @@ -176,10 +177,17 @@ impl SubwayServerBuilder { .map(|(a, b, c)| layer_fn(|s| PrometheusService::new(s, protocol, a, b, c))), ); + let batch_request_config = match config.max_batch_size { + Some(0) => BatchRequestConfig::Disabled, + Some(max_size) => BatchRequestConfig::Limit(max_size), + None => BatchRequestConfig::Unlimited, + }; + let service_builder = ServerBuilder::default() .set_rpc_middleware(rpc_middleware) .set_http_middleware(http_middleware) .max_connections(config.max_connections) + .set_batch_request_config(batch_request_config) .set_id_provider(RandomStringIdProvider::new(16)) .to_service_builder(); diff --git a/src/server.rs b/src/server.rs index 0404396..2f062ac 100644 --- a/src/server.rs +++ b/src/server.rs @@ -225,10 +225,9 @@ pub async fn build(config: Config) -> anyhow::Result { #[cfg(test)] mod tests { use jsonrpsee::{ - core::client::ClientT, + core::{client::ClientT, params::BatchRequestBuilder}, rpc_params, - server::ServerBuilder, - server::ServerHandle, + server::{ServerBuilder, ServerHandle}, ws_client::{WsClient, WsClientBuilder}, RpcModule, }; @@ -244,7 +243,12 @@ mod tests { const PHO: &str = "call_pho"; const BAR: &str = "bar"; - async fn subway_server(endpoint: String, port: u16, request_timeout_seconds: Option) -> SubwayServerHandle { + async fn subway_server( + endpoint: String, + port: u16, + request_timeout_seconds: Option, + max_batch_size: Option, + ) -> SubwayServerHandle { let config = Config { extensions: ExtensionsConfig { client: Some(ClientConfig { @@ -255,6 +259,7 @@ mod tests { listen_address: "127.0.0.1".to_string(), port, max_connections: 1024, + max_batch_size, request_timeout_seconds: request_timeout_seconds.unwrap_or(10), http_methods: Vec::new(), cors: None, @@ -337,7 +342,7 @@ mod tests { #[tokio::test] async fn null_param_works() { let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9955").await; - let subway_server = subway_server(endpoint, 9944, None).await; + let subway_server = subway_server(endpoint, 9944, None, None).await; let url = format!("ws://{}", subway_server.addr); let client = ws_client(&url).await; assert_eq!(BAR, client.request::(PHO, rpc_params!()).await.unwrap()); @@ -349,7 +354,7 @@ mod tests { async fn request_timeout() { let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9956").await; // server with 1 second timeout - let subway_server = subway_server(endpoint, 9945, Some(1)).await; + let subway_server = subway_server(endpoint, 9945, Some(1), None).await; let url = format!("ws://{}", subway_server.addr); // client with default 60 second timeout let client = ws_client(&url).await; @@ -375,4 +380,82 @@ mod tests { subway_server.handle.stop().unwrap(); upstream_dummy_server_handle.stop().unwrap(); } + + #[tokio::test] + async fn batch_requests_works() { + let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9957").await; + + // Server with max batch size 3 + let subway_server = subway_server(endpoint, 9946, None, Some(3)).await; + let url = format!("ws://{}", subway_server.addr); + let client = ws_client(&url).await; + + // Sending 3 request in a batch + let mut batch = BatchRequestBuilder::new(); + batch.insert(PHO, rpc_params!()).unwrap(); + batch.insert(PHO, rpc_params!()).unwrap(); + batch.insert(PHO, rpc_params!()).unwrap(); + + let res = client.batch_request::(batch).await.unwrap(); + assert_eq!(res.num_successful_calls(), 3); + + upstream_dummy_server_handle.stop().unwrap(); + } + + #[tokio::test] + async fn batch_requests_exceeds_max_size_errors() { + let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9958").await; + + // Server with max batch size 3 + let subway_server = subway_server(endpoint, 9947, None, Some(3)).await; + let url = format!("ws://{}", subway_server.addr); + let client = ws_client(&url).await; + + // Sending 4 request in a batch + let mut batch = BatchRequestBuilder::new(); + batch.insert(PHO, rpc_params!()).unwrap(); + batch.insert(PHO, rpc_params!()).unwrap(); + batch.insert(PHO, rpc_params!()).unwrap(); + batch.insert(PHO, rpc_params!()).unwrap(); + + // Due to the limitation of jsonrpsee client implementation, + // we can't check the error message when response batch id is `null`. + // E.g. + // Raw response - `{"jsonrpc":"2.0","error":{"code":-32010,"message":"The batch request was too large","data":"Exceeded max limit of 3"},"id":null}` + // Jsonrpsee client response - `Err(RestartNeeded(InvalidRequestId(NotPendingRequest("null"))))` + // + // Checking if error is returned for now. + let res = client.batch_request::(batch).await; + + assert!(res.is_err()); + + upstream_dummy_server_handle.stop().unwrap(); + } + + #[tokio::test] + async fn batch_requests_disabled_errors() { + let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9959").await; + + // Server with max batch size 0 (disabled) + let subway_server = subway_server(endpoint, 9948, None, Some(0)).await; + let url = format!("ws://{}", subway_server.addr); + let client = ws_client(&url).await; + + // Sending 1 request in a batch + let mut batch = BatchRequestBuilder::new(); + batch.insert(PHO, rpc_params!()).unwrap(); + + // Due to the limitation of jsonrpsee client implementation, + // we can't check the error message when response batch id is `null`. + // E.g. + // Raw response - `{"jsonrpc":"2.0","error":{"code":-32005,"message":"Batched requests are not supported by this server"},"id":null}` + // Jsonrpsee client response - `Err(RestartNeeded(InvalidRequestId(NotPendingRequest("null"))))` + // + // Checking if error is returned for now. + let res = client.batch_request::(batch).await; + + assert!(res.is_err()); + + upstream_dummy_server_handle.stop().unwrap(); + } } diff --git a/src/tests/merge_subscription.rs b/src/tests/merge_subscription.rs index a41bb4b..21ed046 100644 --- a/src/tests/merge_subscription.rs +++ b/src/tests/merge_subscription.rs @@ -54,6 +54,7 @@ async fn merge_subscription_works() { listen_address: "0.0.0.0".to_string(), port: 0, max_connections: 10, + max_batch_size: None, request_timeout_seconds: 120, http_methods: Vec::new(), cors: None, diff --git a/src/tests/upstream.rs b/src/tests/upstream.rs index 84c38f5..8a78a90 100644 --- a/src/tests/upstream.rs +++ b/src/tests/upstream.rs @@ -36,6 +36,7 @@ async fn upstream_error_propagate() { listen_address: "0.0.0.0".to_string(), port: 0, max_connections: 10, + max_batch_size: None, request_timeout_seconds: 120, http_methods: Vec::new(), cors: None,