diff --git a/runner/src/scenario/ceramic/mod.rs b/runner/src/scenario/ceramic/mod.rs index e8136d18..012f6ca0 100644 --- a/runner/src/scenario/ceramic/mod.rs +++ b/runner/src/scenario/ceramic/mod.rs @@ -164,7 +164,7 @@ impl From for CeramicScenarioParameters { model_reuse: ReuseType::Shared, model_instance_reuse: ReuseType::PerUser, number_of_documents: 0, - store_mids: false, + store_mids: false, }, Scenario::CeramicQuery => Self { did_type: DidType::Shared, diff --git a/runner/src/scenario/ceramic/new_streams.rs b/runner/src/scenario/ceramic/new_streams.rs index aaba6fdc..8ac2992c 100644 --- a/runner/src/scenario/ceramic/new_streams.rs +++ b/runner/src/scenario/ceramic/new_streams.rs @@ -126,7 +126,7 @@ pub async fn small_large_scenario( let redis_cli = get_redis_client().await.unwrap(); let multiplexed_conn = redis_cli.get_multiplexed_tokio_connection().await.unwrap(); let shared_conn = Arc::new(Mutex::new(multiplexed_conn)); - + let config = CeramicModelInstanceTestUser::prep_scenario(params.clone()) .await .unwrap(); @@ -145,11 +145,7 @@ pub async fn small_large_scenario( let conn_clone = instantiate_small_model_conn.clone(); Box::pin(async move { let mut conn = conn_clone.lock().await; - instantiate_small_model( - user, - params.store_mids, - &mut *conn - ).await + instantiate_small_model(user, params.store_mids, &mut *conn).await }) })) .set_name("instantiate_small_model"); @@ -210,7 +206,11 @@ pub async fn benchmark_scenario( .register_transaction(after_metrics)) } -async fn instantiate_small_model(user: &mut GooseUser, store_in_redis: bool, conn: &mut MultiplexedConnection) -> TransactionResult { +async fn instantiate_small_model( + user: &mut GooseUser, + store_in_redis: bool, + conn: &mut MultiplexedConnection, +) -> TransactionResult { let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); let response = ModelInstanceRequests::create_model_instance( user, @@ -222,12 +222,19 @@ async fn instantiate_small_model(user: &mut GooseUser, store_in_redis: bool, con .await?; if store_in_redis { let stream_id_string = response.to_string(); - let _: () = conn.sadd(format!("anchor_mids"), stream_id_string).await.unwrap(); + let _: () = conn + .sadd(format!("anchor_mids"), stream_id_string) + .await + .unwrap(); } Ok(()) } -async fn instantiate_large_model(user: &mut GooseUser, store_in_redis: bool, conn: &mut MultiplexedConnection) -> TransactionResult { +async fn instantiate_large_model( + user: &mut GooseUser, + store_in_redis: bool, + conn: &mut MultiplexedConnection, +) -> TransactionResult { let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); let response = ModelInstanceRequests::create_model_instance( user, @@ -239,7 +246,10 @@ async fn instantiate_large_model(user: &mut GooseUser, store_in_redis: bool, con .await?; if store_in_redis { let stream_id_string = response.to_string(); - let _: () = conn.sadd(format!("anchor_mids"), stream_id_string).await.unwrap(); + let _: () = conn + .sadd(format!("anchor_mids"), stream_id_string) + .await + .unwrap(); } Ok(()) } diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index cf5f7e03..5377f93c 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -19,7 +19,8 @@ use tracing::{error, info, warn}; use crate::{ scenario::{ - ceramic::{self, new_streams}, get_redis_client, ipfs_block_fetch, recon_sync + ceramic::{self, new_streams}, + get_redis_client, ipfs_block_fetch, recon_sync, }, utils::parse_peers_info, CommandResult, @@ -85,7 +86,6 @@ pub struct Opts { /// left to the scenario (requests per second, total requests, rps/node etc). #[arg(long, env = "SIMULATE_TARGET_REQUESTS")] target_request_rate: Option, - // #[arg(long, env = "SIMULATE_ANCHOR_WAIT_TIME")] // anchor_wait_time: Option, } @@ -531,7 +531,7 @@ impl ScenarioState { /// Should return the Minimum RPS of all peers as the f64 pub async fn validate_scenario_success( &self, - metrics: &GooseMetrics + metrics: &GooseMetrics, ) -> (CommandResult, Option) { if !self.manager { return (CommandResult::Success, None); @@ -620,74 +620,99 @@ impl ScenarioState { } } - pub async fn get_anchor_status(&self, peer: &Peer, stream_id: String) -> Result { + pub async fn get_anchor_status( + &self, + peer: &Peer, + stream_id: String, + ) -> Result { let client = reqwest::Client::new(); - let ceramic_addr = peer.ceramic_addr().ok_or_else(|| anyhow!("Peer does not have a ceramic address"))?; - + let ceramic_addr = peer + .ceramic_addr() + .ok_or_else(|| anyhow!("Peer does not have a ceramic address"))?; + let streams_url = format!("{}/{}/{}", ceramic_addr, "api/v0/streams", stream_id); - info!("Getting anchor status at URL: {}", streams_url); - - let response = client.get(streams_url).send().await.map_err(|e| { - error!("HTTP request failed: {}", e); - e - })?.error_for_status().map_err(|e| { - error!("HTTP request returned unsuccessful status: {}", e); - e - })?; - + + let response = client + .get(streams_url) + .send() + .await + .map_err(|e| { + error!("HTTP request failed: {}", e); + e + })? + .error_for_status() + .map_err(|e| { + error!("HTTP request returned unsuccessful status: {}", e); + e + })?; + let response_json = response.json::().await.map_err(|e| { error!("Failed to parse response as JSON: {}", e); e })?; - + if let Some(state) = response_json.get("state") { if let Some(anchor_status) = state.get("anchorStatus") { match anchor_status { serde_json::Value::String(s) => return Ok(s.clone()), serde_json::Value::Number(n) => return Ok(n.to_string()), _ => { - error!("Unexpected anchor status type in response: {}", anchor_status); - bail!("Unexpected anchor status type: expected string or number, got: {:?}", anchor_status); + error!( + "Unexpected anchor status type in response: {}", + anchor_status + ); + bail!( + "Unexpected anchor status type: expected string or number, got: {:?}", + anchor_status + ); } } } } - + error!("Anchor status not found in the response JSON"); bail!("Anchor status missing in the response") } - pub async fn get_set_from_redis(&self, key: &str) -> Result, anyhow::Error> { // Create a new Redis client let client: redis::Client = get_redis_client().await?; let mut connection = client.get_async_connection().await?; // Get the MIDs from Redis - let response = redis::cmd("SMEMBERS").arg(key).query_async(&mut connection).await?; + let response = redis::cmd("SMEMBERS") + .arg(key) + .query_async(&mut connection) + .await?; // Return the MIDs Ok(response) } - pub async fn remove_stream_from_redis(&self, key: &str, stream_ids: Vec) -> Result { + pub async fn remove_stream_from_redis( + &self, + key: &str, + stream_ids: Vec, + ) -> Result { let client: redis::Client = get_redis_client().await?; let mut connection = client.get_async_connection().await?; - let response = redis::cmd("SREM").arg(key).arg(stream_ids).query_async(&mut connection).await?; + let response = redis::cmd("SREM") + .arg(key) + .arg(stream_ids) + .query_async(&mut connection) + .await?; Ok(response) } async fn validate_anchoring_banchmark_scenario_success( &self, ) -> Result<(CommandResult, Option), anyhow::Error> { - let mut anchored_count = 0; let mut pending_count = 0; let mut failed_count = 0; let mut not_requested_count = 0; - let wait_duration = Duration::from_secs(20 * 60); + let wait_duration = Duration::from_secs(60 * 60); // TODO_3164_1 : Make this a parameter, pass it in from the scenario config - // TODO_3164_2 : Code clean-up : Remove all info logs used for debugging // TODO_3164_3 : Code clean-up : Move redis calls to separate file move it out of simulate.rs - // TODO_3164_4 : Code clean-up : Move api call (fetch stream) to model_instance.rs?, maybe rename it + // TODO_3164_4 : Code clean-up : Move api call (fetch stream) to model_instance.rs?, maybe rename it sleep(wait_duration).await; // Pick a peer at random @@ -697,19 +722,16 @@ impl ScenarioState { info!("Number of MIDs: {}", ids.len()); // Make an API call to get the status of request from the chosen peer - // This assumes the existence of a function `get_anchor_status` which is not defined in the provided snippet for stream_id in ids.clone() { info!("Fetching anchorstatus for streamID {}", stream_id); match self.get_anchor_status(peer, stream_id.clone()).await { - Ok(status) => { - match status.as_str() { - "ANCHORED" => anchored_count += 1, - "PENDING" => pending_count += 1, - "FAILED" => failed_count += 1, - "NOT_REQUESTED" => not_requested_count += 1, - _ => info!("Unknown anchor status: {}", status), - } - } + Ok(status) => match status.as_str() { + "ANCHORED" => anchored_count += 1, + "PENDING" => pending_count += 1, + "FAILED" => failed_count += 1, + "NOT_REQUESTED" => not_requested_count += 1, + _ => info!("Unknown anchor status: {}", status), + }, Err(e) => { failed_count += 1; error!("Failed to get anchor status: {}", e); @@ -728,21 +750,21 @@ impl ScenarioState { info!("Pending count: {:2}", pending_count); info!("Failed count: {:2}", failed_count); - // TODO_3164_6 : Remove this logic if not needed. If it gives us something better keep it + // TODO_3164_6 : Logic to caluclate RPS, remove it if not needed let after_metrics = self - .combine_metrics( - vec![ - CERAMIC_CAS_SUCCESS_METRIC_NAME.to_string(), - CERAMIC_CAS_FAILED_METRIC_NAME.to_string(), - ], - &["success", "failure"], - PROM_METRICS_PATH, - ) - .await?; - let before_metrics = self.before_metrics.as_ref().unwrap(); + .combine_metrics( + vec![ + CERAMIC_CAS_SUCCESS_METRIC_NAME.to_string(), + CERAMIC_CAS_FAILED_METRIC_NAME.to_string(), + ], + &["success", "failure"], + PROM_METRICS_PATH, + ) + .await?; + let before_metrics = self.before_metrics.as_ref().unwrap(); - // Calculate success and failure counts - let (success_count, failure_count): (u64, u64) = after_metrics + // Calculate success and failure counts + let (success_count, failure_count): (u64, u64) = after_metrics .iter() .zip(before_metrics) .fold((0, 0), |(success_acc, failure_acc), (after, before)| { @@ -776,14 +798,18 @@ impl ScenarioState { info!("Success RPS: {:.2}", success_rps); info!("Failure RPS: {:.2}", failure_rps); - // Determine success based on the anchored_count - if anchored_count > 1000000 { - Ok((CommandResult::Success, None)) + if failed_count > 0 { + Ok((CommandResult::Failure(anyhow!("Failed count is greater than 0")), None)) } else { - Ok((CommandResult::Failure(anyhow!("Anchored count {} is less than the success threshold of 1000000", anchored_count)), None)) + Ok(( + CommandResult::Success(anyhow!( + "Anchored count is : {}", + anchored_count + )), + None, + )) } - } /// Removed from `validate_scenario_success` to make testing easier as constructing the GooseMetrics appropriately is difficult