diff --git a/runner/src/load_generator/gen.rs b/runner/src/load_generator/gen.rs index a2cd273..a56d588 100644 --- a/runner/src/load_generator/gen.rs +++ b/runner/src/load_generator/gen.rs @@ -1,10 +1,9 @@ use std::collections::HashMap; use std::path::PathBuf; -use crate::load_generator::utils::generator_utils::CeramicConfig; -use crate::load_generator::utils::generator_utils::CeramicDidType; -use crate::load_generator::utils::generator_utils::CeramicScenarioParameters; -use crate::load_generator::utils::generator_utils::StableLoadUser; +use crate::load_generator::utils::{ + CeramicConfig, CeramicDidType, CeramicScenarioParameters, StableLoadUser, +}; use crate::utils::parse_peers_info; use crate::CommandResult; use anyhow::Result; @@ -16,16 +15,30 @@ use tokio::time::{Duration, Instant}; // TODO : Use this to envoke a particular scenario, currently we only have one // so this is unused #[allow(dead_code)] +#[derive(Clone, Debug)] pub enum WeekLongSimulationScenarios { CreateModelInstancesSynced, } +impl std::str::FromStr for WeekLongSimulationScenarios { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "CreateModelInstancesSynced" => { + Ok(WeekLongSimulationScenarios::CreateModelInstancesSynced) + } + _ => Err(format!("Invalid scenario: {}", s)), + } + } +} + /// Options to Simulate command #[derive(Args, Debug, Clone)] pub struct WeekLongSimulationOpts { /// Simulation scenario to run. #[arg(long, env = "GENERATOR_SCENARIO")] - scenario: String, + scenario: WeekLongSimulationScenarios, /// Path to file containing the list of peers. /// File should contian JSON encoding of Vec. @@ -36,7 +49,7 @@ pub struct WeekLongSimulationOpts { /// for making requests. They should have low memory overhead, so you can /// create many tasks and then use `throttle_requests_rate` to constrain the overall /// throughput on the node (specifically the HTTP requests made). - #[arg(long, default_value_t = 4, env = "GENERATOR_TASKS")] + #[arg(long, default_value_t = 25, env = "GENERATOR_TASKS")] tasks: usize, /// Duration of the simulation in hours @@ -92,7 +105,7 @@ pub async fn simulate_load(opts: WeekLongSimulationOpts) -> Result Result<()> { let start_time = Instant::now(); @@ -125,7 +139,7 @@ pub async fn create_model_instances_continuously( // increasing tasks can help increase throughput let (tx, mut rx) = tokio::sync::mpsc::channel(10000); let mut tasks = tokio::task::JoinSet::new(); - for i in 0..100 { + for i in 0..tasks_count { let user_clone = stable_load_user.clone(); let model = model.clone(); let tx = tx.clone(); @@ -144,28 +158,29 @@ pub async fn create_model_instances_continuously( Ok(Ok(mid)) => match tx.send(Ok(mid.to_string())).await { Ok(_) => {} Err(e) => { - println!("Failed to send MID: {}", e); + eprintln!("Failed to send MID: {}", e); } }, Ok(Err(e)) => match tx.send(Err(e.to_string())).await { Ok(_) => {} Err(e) => { - println!("Failed to send error: {}", e); + eprintln!("Failed to send error: {}", e); } }, Err(e) => match tx.send(Err(e.to_string())).await { Ok(_) => {} Err(e) => { - println!("Failed to send error: {}", e); + eprintln!("Failed to send error: {}", e); } }, } } }); } + // Drop the tx sender, since the exit condition below requires the senders to be dropped for termination drop(tx); loop { - let mut mid_vec: Vec> = Vec::new(); + let mut mid_vec: Vec> = Vec::with_capacity(10); if rx.recv_many(&mut mid_vec, 10).await > 0 { for mid in mid_vec { match mid { @@ -178,7 +193,8 @@ pub async fn create_model_instances_continuously( } } } - if start_time.elapsed() > duration { + // Add a small buffer to the duration to account for the time it takes to send the MIDs + if start_time.elapsed() > duration + Duration::from_secs(5) { tasks.abort_all(); break; } @@ -201,6 +217,7 @@ pub async fn create_model_instances_continuously( struct WeekLongSimulationState { pub peers: Vec, pub run_time: String, + pub tasks: usize, } impl WeekLongSimulationState { @@ -214,6 +231,7 @@ impl WeekLongSimulationState { Ok(Self { peers: parse_peers_info(opts.peers.clone()).await?, run_time: opts.run_time, + tasks: opts.tasks, }) } diff --git a/runner/src/load_generator/utils/ceramic_models_utils.rs b/runner/src/load_generator/utils/ceramic_models_utils.rs index 26088e1..05c54d0 100644 --- a/runner/src/load_generator/utils/ceramic_models_utils.rs +++ b/runner/src/load_generator/utils/ceramic_models_utils.rs @@ -45,7 +45,12 @@ impl CeramicModelUser { if resp.status().is_success() { Ok(()) } else { - Err(anyhow::anyhow!("Failed to index model")) + Err(anyhow::anyhow!( + "Failed to index model: status {:?} , resp_text {:?}, model_id {:?}", + resp.status(), + resp.text().await, + model_id + )) } } @@ -84,9 +89,10 @@ impl CeramicModelUser { Ok(streams_response.stream_id) } else { Err(anyhow::anyhow!( - "Failed to setup model: status {:?} , resp_text {:?}", + "Failed to setup model: status {:?} , resp_text {:?}, model_schema {:?}", resp.status(), - resp.text().await + resp.text().await, + model.schema() )) } } @@ -126,9 +132,10 @@ impl CeramicModelUser { Ok(parsed_resp.stream_id) } else { Err(anyhow::anyhow!( - "Failed to create model: status {:?} , resp_text {:?}", + "Failed to create model: status {:?} , status_text {:?}, model_id {:?}", resp.status(), - resp.text().await + resp.text().await, + model )) } } diff --git a/runner/src/load_generator/utils/generator_utils.rs b/runner/src/load_generator/utils/generator_utils.rs index f96ea72..65b7ac4 100644 --- a/runner/src/load_generator/utils/generator_utils.rs +++ b/runner/src/load_generator/utils/generator_utils.rs @@ -7,8 +7,8 @@ use std::time::Duration; use super::ceramic_models_utils::CeramicModelUser; -pub static HTTP_TIMEOUT: Duration = Duration::from_secs(5); -pub static HTTP_POOL_MAX_IDLE_PER_HOST: usize = 300; +pub const HTTP_TIMEOUT: Duration = Duration::from_secs(5); +pub const HTTP_POOL_MAX_IDLE_PER_HOST: usize = 300; #[derive(Clone, Debug)] pub struct CeramicConfig { diff --git a/runner/src/load_generator/utils/mod.rs b/runner/src/load_generator/utils/mod.rs index 3fc1ccf..1749f02 100644 --- a/runner/src/load_generator/utils/mod.rs +++ b/runner/src/load_generator/utils/mod.rs @@ -1,2 +1,4 @@ -pub mod ceramic_models_utils; -pub mod generator_utils; +mod ceramic_models_utils; +mod generator_utils; + +pub use generator_utils::*; diff --git a/runner/src/main.rs b/runner/src/main.rs index 78395ec..248a366 100644 --- a/runner/src/main.rs +++ b/runner/src/main.rs @@ -76,10 +76,13 @@ async fn main() -> Result<()> { if !matches!(args.command, Command::GenerateLoad(_)) { telemetry::init_tracing(Some(args.otlp_endpoint.clone())).await?; } - let metrics_controller = if matches!(args.command, Command::GenerateLoad(_)) { - None + let (metrics_controller, enable_metrics) = if matches!(args.command, Command::GenerateLoad(_)) { + (None, false) } else { - Some(telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?) + ( + Some(telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?), + true, + ) }; info!("starting runner"); @@ -98,13 +101,10 @@ async fn main() -> Result<()> { Command::GenerateLoad(opts) => simulate_load(opts).await?, Command::Noop => CommandResult::Success, }; - if !matches!(args.command, Command::GenerateLoad(_)) { + if enable_metrics && metrics_controller.is_some() { // Flush traces and metrics before shutdown shutdown_tracer_provider(); - if let Some(metrics_controller) = metrics_controller.clone() { - metrics_controller.force_flush()?; - } - drop(metrics_controller); + metrics_controller.unwrap().force_flush()?; shutdown_meter_provider(); }