diff --git a/operator/src/network/cas.rs b/operator/src/network/cas.rs index c3a6d08..4734323 100644 --- a/operator/src/network/cas.rs +++ b/operator/src/network/cas.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::{cmp, collections::BTreeMap}; use k8s_openapi::{ api::{ @@ -317,6 +317,14 @@ pub fn cas_stateful_set_spec( ] .concat(); + let num_api_cpus = config + .cas_resource_limits + .cpu + .clone() + .and_then(|cpu_quantity| quantity_to_milli(&cpu_quantity)) + .map(|cpu_millis| cmp::max(cpu_millis / 1000, 1)) + .unwrap_or(1); + let mut cas_api_env = [ cas_node_env.clone(), vec![ @@ -335,6 +343,11 @@ pub fn cas_stateful_set_spec( value: Some(DEFAULT_METRICS_PORT.to_string()), ..Default::default() }, + EnvVar { + name: "MULTIPROCESS_SIZE".to_owned(), + value: Some(num_api_cpus.to_string()), + ..Default::default() + }, ], ] .concat(); @@ -548,6 +561,16 @@ pub fn cas_stateful_set_spec( ..Default::default() } } + +fn quantity_to_milli(quantity: &Quantity) -> Option { + if let Some(value) = quantity.0.strip_suffix('m') { + return value.parse::().ok(); + } else if let Ok(value) = quantity.0.parse::() { + return Some((value * 1000.0) as i64); + } + None +} + pub fn cas_service_spec() -> ServiceSpec { ServiceSpec { ports: Some(vec![ServicePort { diff --git a/operator/src/network/controller.rs b/operator/src/network/controller.rs index 067fbd0..fa8a05b 100644 --- a/operator/src/network/controller.rs +++ b/operator/src/network/controller.rs @@ -2848,7 +2848,7 @@ mod tests { stub.cas_stateful_set.patch(expect![[r#" --- original +++ modified - @@ -134,8 +134,8 @@ + @@ -138,8 +138,8 @@ "value": "http://localstack:4566/000000000000/cas-anchor-dev-" } ], @@ -2859,7 +2859,7 @@ mod tests { "name": "cas-api", "ports": [ { - @@ -276,8 +276,8 @@ + @@ -280,8 +280,8 @@ "value": "1000" } ], @@ -2870,7 +2870,7 @@ mod tests { "name": "cas-worker", "resources": { "limits": { - @@ -446,8 +446,8 @@ + @@ -450,8 +450,8 @@ "value": "dev" } ], @@ -2962,7 +2962,7 @@ mod tests { stub.cas_stateful_set.patch(expect![[r#" --- original +++ modified - @@ -144,12 +144,12 @@ + @@ -148,12 +148,12 @@ ], "resources": { "limits": { @@ -2977,7 +2977,7 @@ mod tests { "ephemeral-storage": "1Gi", "memory": "1Gi" } - @@ -281,12 +281,12 @@ + @@ -285,12 +285,12 @@ "name": "cas-worker", "resources": { "limits": { @@ -2992,7 +2992,7 @@ mod tests { "ephemeral-storage": "1Gi", "memory": "1Gi" } - @@ -369,12 +369,12 @@ + @@ -373,12 +373,12 @@ "name": "cas-scheduler", "resources": { "limits": { @@ -3007,7 +3007,7 @@ mod tests { "ephemeral-storage": "1Gi", "memory": "1Gi" } - @@ -474,7 +474,7 @@ + @@ -478,7 +478,7 @@ ], "resources": { "requests": { @@ -4702,6 +4702,99 @@ mod tests { timeout_after_1s(mocksrv).await; } #[tokio::test] + async fn cas_api_env_cpu_override() { + // Setup network spec and status + let network = Network::test().with_spec(NetworkSpec { + cas: Some(CasSpec { + cas_resource_limits: Some(ResourceLimitsSpec { + cpu: Some(Quantity("2.5".to_owned())), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }); + let mock_rpc_client = default_ipfs_rpc_mock(); + let mut stub = Stub::default().with_network(network.clone()); + stub.cas_stateful_set.patch(expect![[r#" + --- original + +++ modified + @@ -119,7 +119,7 @@ + }, + { + "name": "MULTIPROCESS_SIZE", + - "value": "1" + + "value": "2" + }, + { + "name": "NODE_ENV", + @@ -148,14 +148,12 @@ + ], + "resources": { + "limits": { + - "cpu": "250m", + - "ephemeral-storage": "1Gi", + - "memory": "1Gi" + + "cpu": "2.5", + + "ephemeral-storage": "1Gi" + }, + "requests": { + - "cpu": "250m", + - "ephemeral-storage": "1Gi", + - "memory": "1Gi" + + "cpu": "2.5", + + "ephemeral-storage": "1Gi" + } + } + }, + @@ -285,14 +283,12 @@ + "name": "cas-worker", + "resources": { + "limits": { + - "cpu": "250m", + - "ephemeral-storage": "1Gi", + - "memory": "1Gi" + + "cpu": "2.5", + + "ephemeral-storage": "1Gi" + }, + "requests": { + - "cpu": "250m", + - "ephemeral-storage": "1Gi", + - "memory": "1Gi" + + "cpu": "2.5", + + "ephemeral-storage": "1Gi" + } + } + }, + @@ -373,14 +369,12 @@ + "name": "cas-scheduler", + "resources": { + "limits": { + - "cpu": "250m", + - "ephemeral-storage": "1Gi", + - "memory": "1Gi" + + "cpu": "2.5", + + "ephemeral-storage": "1Gi" + }, + "requests": { + - "cpu": "250m", + - "ephemeral-storage": "1Gi", + - "memory": "1Gi" + + "cpu": "2.5", + + "ephemeral-storage": "1Gi" + } + } + } + "#]]); + let (testctx, api_handle) = Context::test(mock_rpc_client); + let fakeserver = ApiServerVerifier::new(api_handle); + let mocksrv = stub.run(fakeserver); + reconcile(Arc::new(network), testctx) + .await + .expect("reconciler"); + timeout_after_1s(mocksrv).await; + } + #[tokio::test] #[traced_test] async fn migration_cmd() { // Setup network spec and status diff --git a/operator/src/network/testdata/default_stubs/cas_stateful_set b/operator/src/network/testdata/default_stubs/cas_stateful_set index 72f4f9c..230b1db 100644 --- a/operator/src/network/testdata/default_stubs/cas_stateful_set +++ b/operator/src/network/testdata/default_stubs/cas_stateful_set @@ -117,6 +117,10 @@ Request { "name": "METRICS_PORT", "value": "9464" }, + { + "name": "MULTIPROCESS_SIZE", + "value": "1" + }, { "name": "NODE_ENV", "value": "dev"