Skip to content

Commit

Permalink
chore: TODO_2: remove logs + fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Samika Kashyap committed Apr 16, 2024
1 parent d3ee1a7 commit 78dd368
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 67 deletions.
2 changes: 1 addition & 1 deletion runner/src/scenario/ceramic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl From<Scenario> for CeramicScenarioParameters {
model_reuse: ReuseType::Shared,
model_instance_reuse: ReuseType::PerUser,
number_of_documents: 0,
store_mids: false,
store_mids: false,
},
Scenario::CeramicQuery => Self {
did_type: DidType::Shared,
Expand Down
30 changes: 20 additions & 10 deletions runner/src/scenario/ceramic/new_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn small_large_scenario(
let redis_cli = get_redis_client().await.unwrap();
let multiplexed_conn = redis_cli.get_multiplexed_tokio_connection().await.unwrap();
let shared_conn = Arc::new(Mutex::new(multiplexed_conn));

let config = CeramicModelInstanceTestUser::prep_scenario(params.clone())
.await
.unwrap();
Expand All @@ -145,11 +145,7 @@ pub async fn small_large_scenario(
let conn_clone = instantiate_small_model_conn.clone();
Box::pin(async move {
let mut conn = conn_clone.lock().await;
instantiate_small_model(
user,
params.store_mids,
&mut *conn
).await
instantiate_small_model(user, params.store_mids, &mut *conn).await
})
}))
.set_name("instantiate_small_model");
Expand Down Expand Up @@ -210,7 +206,11 @@ pub async fn benchmark_scenario(
.register_transaction(after_metrics))
}

async fn instantiate_small_model(user: &mut GooseUser, store_in_redis: bool, conn: &mut MultiplexedConnection) -> TransactionResult {
async fn instantiate_small_model(
user: &mut GooseUser,
store_in_redis: bool,
conn: &mut MultiplexedConnection,
) -> TransactionResult {
let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned();
let response = ModelInstanceRequests::create_model_instance(
user,
Expand All @@ -222,12 +222,19 @@ async fn instantiate_small_model(user: &mut GooseUser, store_in_redis: bool, con
.await?;
if store_in_redis {
let stream_id_string = response.to_string();
let _: () = conn.sadd(format!("anchor_mids"), stream_id_string).await.unwrap();
let _: () = conn
.sadd(format!("anchor_mids"), stream_id_string)
.await
.unwrap();
}
Ok(())
}

async fn instantiate_large_model(user: &mut GooseUser, store_in_redis: bool, conn: &mut MultiplexedConnection) -> TransactionResult {
async fn instantiate_large_model(
user: &mut GooseUser,
store_in_redis: bool,
conn: &mut MultiplexedConnection,
) -> TransactionResult {
let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned();
let response = ModelInstanceRequests::create_model_instance(
user,
Expand All @@ -239,7 +246,10 @@ async fn instantiate_large_model(user: &mut GooseUser, store_in_redis: bool, con
.await?;
if store_in_redis {
let stream_id_string = response.to_string();
let _: () = conn.sadd(format!("anchor_mids"), stream_id_string).await.unwrap();
let _: () = conn
.sadd(format!("anchor_mids"), stream_id_string)
.await
.unwrap();
}
Ok(())
}
Expand Down
138 changes: 82 additions & 56 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use tracing::{error, info, warn};

use crate::{
scenario::{
ceramic::{self, new_streams}, get_redis_client, ipfs_block_fetch, recon_sync
ceramic::{self, new_streams},
get_redis_client, ipfs_block_fetch, recon_sync,
},
utils::parse_peers_info,
CommandResult,
Expand Down Expand Up @@ -85,7 +86,6 @@ pub struct Opts {
/// left to the scenario (requests per second, total requests, rps/node etc).
#[arg(long, env = "SIMULATE_TARGET_REQUESTS")]
target_request_rate: Option<usize>,

// #[arg(long, env = "SIMULATE_ANCHOR_WAIT_TIME")]
// anchor_wait_time: Option<u64>,
}
Expand Down Expand Up @@ -531,7 +531,7 @@ impl ScenarioState {
/// Should return the Minimum RPS of all peers as the f64
pub async fn validate_scenario_success(
&self,
metrics: &GooseMetrics
metrics: &GooseMetrics,
) -> (CommandResult, Option<PeerRps>) {
if !self.manager {
return (CommandResult::Success, None);
Expand Down Expand Up @@ -620,74 +620,99 @@ impl ScenarioState {
}
}

pub async fn get_anchor_status(&self, peer: &Peer, stream_id: String) -> Result<String, anyhow::Error> {
pub async fn get_anchor_status(
&self,
peer: &Peer,
stream_id: String,
) -> Result<String, anyhow::Error> {
let client = reqwest::Client::new();
let ceramic_addr = peer.ceramic_addr().ok_or_else(|| anyhow!("Peer does not have a ceramic address"))?;

let ceramic_addr = peer
.ceramic_addr()
.ok_or_else(|| anyhow!("Peer does not have a ceramic address"))?;

let streams_url = format!("{}/{}/{}", ceramic_addr, "api/v0/streams", stream_id);
info!("Getting anchor status at URL: {}", streams_url);

let response = client.get(streams_url).send().await.map_err(|e| {
error!("HTTP request failed: {}", e);
e
})?.error_for_status().map_err(|e| {
error!("HTTP request returned unsuccessful status: {}", e);
e
})?;


let response = client
.get(streams_url)
.send()
.await
.map_err(|e| {
error!("HTTP request failed: {}", e);
e
})?
.error_for_status()
.map_err(|e| {
error!("HTTP request returned unsuccessful status: {}", e);
e
})?;

let response_json = response.json::<serde_json::Value>().await.map_err(|e| {
error!("Failed to parse response as JSON: {}", e);
e
})?;

if let Some(state) = response_json.get("state") {
if let Some(anchor_status) = state.get("anchorStatus") {
match anchor_status {
serde_json::Value::String(s) => return Ok(s.clone()),
serde_json::Value::Number(n) => return Ok(n.to_string()),
_ => {
error!("Unexpected anchor status type in response: {}", anchor_status);
bail!("Unexpected anchor status type: expected string or number, got: {:?}", anchor_status);
error!(
"Unexpected anchor status type in response: {}",
anchor_status
);
bail!(
"Unexpected anchor status type: expected string or number, got: {:?}",
anchor_status
);
}
}
}
}

error!("Anchor status not found in the response JSON");
bail!("Anchor status missing in the response")
}


pub async fn get_set_from_redis(&self, key: &str) -> Result<Vec<String>, anyhow::Error> {
// Create a new Redis client
let client: redis::Client = get_redis_client().await?;
let mut connection = client.get_async_connection().await?;
// Get the MIDs from Redis
let response = redis::cmd("SMEMBERS").arg(key).query_async(&mut connection).await?;
let response = redis::cmd("SMEMBERS")
.arg(key)
.query_async(&mut connection)
.await?;
// Return the MIDs
Ok(response)
}

pub async fn remove_stream_from_redis(&self, key: &str, stream_ids: Vec<String>) -> Result<i32, anyhow::Error> {
pub async fn remove_stream_from_redis(
&self,
key: &str,
stream_ids: Vec<String>,
) -> Result<i32, anyhow::Error> {
let client: redis::Client = get_redis_client().await?;
let mut connection = client.get_async_connection().await?;
let response = redis::cmd("SREM").arg(key).arg(stream_ids).query_async(&mut connection).await?;
let response = redis::cmd("SREM")
.arg(key)
.arg(stream_ids)
.query_async(&mut connection)
.await?;
Ok(response)
}

async fn validate_anchoring_banchmark_scenario_success(
&self,
) -> Result<(CommandResult, Option<PeerRps>), anyhow::Error> {

let mut anchored_count = 0;
let mut pending_count = 0;
let mut failed_count = 0;
let mut not_requested_count = 0;
let wait_duration = Duration::from_secs(20 * 60);
let wait_duration = Duration::from_secs(60 * 60);
// TODO_3164_1 : Make this a parameter, pass it in from the scenario config
// TODO_3164_2 : Code clean-up : Remove all info logs used for debugging
// TODO_3164_3 : Code clean-up : Move redis calls to separate file move it out of simulate.rs
// TODO_3164_4 : Code clean-up : Move api call (fetch stream) to model_instance.rs?, maybe rename it
// TODO_3164_4 : Code clean-up : Move api call (fetch stream) to model_instance.rs?, maybe rename it
sleep(wait_duration).await;

// Pick a peer at random
Expand All @@ -697,19 +722,16 @@ impl ScenarioState {
info!("Number of MIDs: {}", ids.len());

// Make an API call to get the status of request from the chosen peer
// This assumes the existence of a function `get_anchor_status` which is not defined in the provided snippet
for stream_id in ids.clone() {
info!("Fetching anchorstatus for streamID {}", stream_id);
match self.get_anchor_status(peer, stream_id.clone()).await {
Ok(status) => {
match status.as_str() {
"ANCHORED" => anchored_count += 1,
"PENDING" => pending_count += 1,
"FAILED" => failed_count += 1,
"NOT_REQUESTED" => not_requested_count += 1,
_ => info!("Unknown anchor status: {}", status),
}
}
Ok(status) => match status.as_str() {
"ANCHORED" => anchored_count += 1,
"PENDING" => pending_count += 1,
"FAILED" => failed_count += 1,
"NOT_REQUESTED" => not_requested_count += 1,
_ => info!("Unknown anchor status: {}", status),
},
Err(e) => {
failed_count += 1;
error!("Failed to get anchor status: {}", e);
Expand All @@ -728,21 +750,21 @@ impl ScenarioState {
info!("Pending count: {:2}", pending_count);
info!("Failed count: {:2}", failed_count);

// TODO_3164_6 : Remove this logic if not needed. If it gives us something better keep it
// TODO_3164_6 : Logic to caluclate RPS, remove it if not needed
let after_metrics = self
.combine_metrics(
vec![
CERAMIC_CAS_SUCCESS_METRIC_NAME.to_string(),
CERAMIC_CAS_FAILED_METRIC_NAME.to_string(),
],
&["success", "failure"],
PROM_METRICS_PATH,
)
.await?;
let before_metrics = self.before_metrics.as_ref().unwrap();
.combine_metrics(
vec![
CERAMIC_CAS_SUCCESS_METRIC_NAME.to_string(),
CERAMIC_CAS_FAILED_METRIC_NAME.to_string(),
],
&["success", "failure"],
PROM_METRICS_PATH,
)
.await?;
let before_metrics = self.before_metrics.as_ref().unwrap();

// Calculate success and failure counts
let (success_count, failure_count): (u64, u64) = after_metrics
// Calculate success and failure counts
let (success_count, failure_count): (u64, u64) = after_metrics
.iter()
.zip(before_metrics)
.fold((0, 0), |(success_acc, failure_acc), (after, before)| {
Expand Down Expand Up @@ -776,14 +798,18 @@ impl ScenarioState {
info!("Success RPS: {:.2}", success_rps);
info!("Failure RPS: {:.2}", failure_rps);


// Determine success based on the anchored_count
if anchored_count > 1000000 {
Ok((CommandResult::Success, None))
if failed_count > 0 {
Ok((CommandResult::Failure(anyhow!("Failed count is greater than 0")), None))
} else {
Ok((CommandResult::Failure(anyhow!("Anchored count {} is less than the success threshold of 1000000", anchored_count)), None))
Ok((
CommandResult::Success(anyhow!(
"Anchored count is : {}",
anchored_count
)),
None,
))
}

}

/// Removed from `validate_scenario_success` to make testing easier as constructing the GooseMetrics appropriately is difficult
Expand Down

0 comments on commit 78dd368

Please sign in to comment.