Skip to content

Commit

Permalink
chore: review comment suggestions for better rust coding
Browse files Browse the repository at this point in the history
  • Loading branch information
Samika Kashyap committed Jul 24, 2024
1 parent 025596d commit 4f8f9c3
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 30 deletions.
44 changes: 31 additions & 13 deletions runner/src/load_generator/gen.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::collections::HashMap;
use std::path::PathBuf;

use crate::load_generator::utils::generator_utils::CeramicConfig;
use crate::load_generator::utils::generator_utils::CeramicDidType;
use crate::load_generator::utils::generator_utils::CeramicScenarioParameters;
use crate::load_generator::utils::generator_utils::StableLoadUser;
use crate::load_generator::utils::{
CeramicConfig, CeramicDidType, CeramicScenarioParameters, StableLoadUser,
};
use crate::utils::parse_peers_info;
use crate::CommandResult;
use anyhow::Result;
Expand All @@ -16,16 +15,30 @@ use tokio::time::{Duration, Instant};
// TODO : Use this to envoke a particular scenario, currently we only have one
// so this is unused
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub enum WeekLongSimulationScenarios {
CreateModelInstancesSynced,
}

impl std::str::FromStr for WeekLongSimulationScenarios {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"CreateModelInstancesSynced" => {
Ok(WeekLongSimulationScenarios::CreateModelInstancesSynced)
}
_ => Err(format!("Invalid scenario: {}", s)),
}
}
}

/// Options to Simulate command
#[derive(Args, Debug, Clone)]
pub struct WeekLongSimulationOpts {
/// Simulation scenario to run.
#[arg(long, env = "GENERATOR_SCENARIO")]
scenario: String,
scenario: WeekLongSimulationScenarios,

/// Path to file containing the list of peers.
/// File should contian JSON encoding of Vec<Peer>.
Expand All @@ -36,7 +49,7 @@ pub struct WeekLongSimulationOpts {
/// for making requests. They should have low memory overhead, so you can
/// create many tasks and then use `throttle_requests_rate` to constrain the overall
/// throughput on the node (specifically the HTTP requests made).
#[arg(long, default_value_t = 4, env = "GENERATOR_TASKS")]
#[arg(long, default_value_t = 25, env = "GENERATOR_TASKS")]
tasks: usize,

/// Duration of the simulation in hours
Expand Down Expand Up @@ -92,7 +105,7 @@ pub async fn simulate_load(opts: WeekLongSimulationOpts) -> Result<CommandResult

println!("Model: {:?}", model);
let model_instance_creation_result =
create_model_instances_continuously(stable_load_user_1, model, run_time).await;
create_model_instances_continuously(stable_load_user_1, model, run_time, state.tasks).await;
println!(
"Model instance creation result: {:?}",
model_instance_creation_result
Expand All @@ -113,6 +126,7 @@ pub async fn create_model_instances_continuously(
stable_load_user: StableLoadUser,
model: StreamId,
duration_in_hours: u64,
tasks_count: usize,
) -> Result<()> {
let start_time = Instant::now();

Expand All @@ -125,7 +139,7 @@ pub async fn create_model_instances_continuously(
// increasing tasks can help increase throughput
let (tx, mut rx) = tokio::sync::mpsc::channel(10000);
let mut tasks = tokio::task::JoinSet::new();
for i in 0..100 {
for i in 0..tasks_count {
let user_clone = stable_load_user.clone();
let model = model.clone();
let tx = tx.clone();
Expand All @@ -144,28 +158,29 @@ pub async fn create_model_instances_continuously(
Ok(Ok(mid)) => match tx.send(Ok(mid.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send MID: {}", e);
eprintln!("Failed to send MID: {}", e);
}
},
Ok(Err(e)) => match tx.send(Err(e.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send error: {}", e);
eprintln!("Failed to send error: {}", e);
}
},
Err(e) => match tx.send(Err(e.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send error: {}", e);
eprintln!("Failed to send error: {}", e);
}
},
}
}
});
}
// Drop the tx sender, since the exit condition below requires the senders to be dropped for termination
drop(tx);
loop {
let mut mid_vec: Vec<Result<String, String>> = Vec::new();
let mut mid_vec: Vec<Result<String, String>> = Vec::with_capacity(10);
if rx.recv_many(&mut mid_vec, 10).await > 0 {
for mid in mid_vec {
match mid {
Expand All @@ -178,7 +193,8 @@ pub async fn create_model_instances_continuously(
}
}
}
if start_time.elapsed() > duration {
// Add a small buffer to the duration to account for the time it takes to send the MIDs
if start_time.elapsed() > duration + Duration::from_secs(5) {
tasks.abort_all();
break;
}
Expand All @@ -201,6 +217,7 @@ pub async fn create_model_instances_continuously(
struct WeekLongSimulationState {
pub peers: Vec<Peer>,
pub run_time: String,
pub tasks: usize,
}

impl WeekLongSimulationState {
Expand All @@ -214,6 +231,7 @@ impl WeekLongSimulationState {
Ok(Self {
peers: parse_peers_info(opts.peers.clone()).await?,
run_time: opts.run_time,
tasks: opts.tasks,
})
}

Expand Down
17 changes: 12 additions & 5 deletions runner/src/load_generator/utils/ceramic_models_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ impl CeramicModelUser {
if resp.status().is_success() {
Ok(())
} else {
Err(anyhow::anyhow!("Failed to index model"))
Err(anyhow::anyhow!(
"Failed to index model: status {:?} , resp_text {:?}, model_id {:?}",
resp.status(),
resp.text().await,
model_id
))
}
}

Expand Down Expand Up @@ -84,9 +89,10 @@ impl CeramicModelUser {
Ok(streams_response.stream_id)
} else {
Err(anyhow::anyhow!(
"Failed to setup model: status {:?} , resp_text {:?}",
"Failed to setup model: status {:?} , resp_text {:?}, model_schema {:?}",
resp.status(),
resp.text().await
resp.text().await,
model.schema()
))
}
}
Expand Down Expand Up @@ -126,9 +132,10 @@ impl CeramicModelUser {
Ok(parsed_resp.stream_id)
} else {
Err(anyhow::anyhow!(
"Failed to create model: status {:?} , resp_text {:?}",
"Failed to create model: status {:?} , status_text {:?}, model_id {:?}",
resp.status(),
resp.text().await
resp.text().await,
model
))
}
}
Expand Down
4 changes: 2 additions & 2 deletions runner/src/load_generator/utils/generator_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::time::Duration;

use super::ceramic_models_utils::CeramicModelUser;

pub static HTTP_TIMEOUT: Duration = Duration::from_secs(5);
pub static HTTP_POOL_MAX_IDLE_PER_HOST: usize = 300;
pub const HTTP_TIMEOUT: Duration = Duration::from_secs(5);
pub const HTTP_POOL_MAX_IDLE_PER_HOST: usize = 300;

#[derive(Clone, Debug)]
pub struct CeramicConfig {
Expand Down
6 changes: 4 additions & 2 deletions runner/src/load_generator/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pub mod ceramic_models_utils;
pub mod generator_utils;
mod ceramic_models_utils;
mod generator_utils;

pub use generator_utils::*;
16 changes: 8 additions & 8 deletions runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ async fn main() -> Result<()> {
if !matches!(args.command, Command::GenerateLoad(_)) {
telemetry::init_tracing(Some(args.otlp_endpoint.clone())).await?;
}
let metrics_controller = if matches!(args.command, Command::GenerateLoad(_)) {
None
let (metrics_controller, enable_metrics) = if matches!(args.command, Command::GenerateLoad(_)) {
(None, false)
} else {
Some(telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?)
(
Some(telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?),
true,
)
};
info!("starting runner");

Expand All @@ -98,13 +101,10 @@ async fn main() -> Result<()> {
Command::GenerateLoad(opts) => simulate_load(opts).await?,
Command::Noop => CommandResult::Success,
};
if !matches!(args.command, Command::GenerateLoad(_)) {
if enable_metrics && metrics_controller.is_some() {
// Flush traces and metrics before shutdown
shutdown_tracer_provider();
if let Some(metrics_controller) = metrics_controller.clone() {
metrics_controller.force_flush()?;
}
drop(metrics_controller);
metrics_controller.unwrap().force_flush()?;
shutdown_meter_provider();
}

Expand Down

0 comments on commit 4f8f9c3

Please sign in to comment.