diff --git a/crates/arroyo-worker/src/arrow/async_udf.rs b/crates/arroyo-worker/src/arrow/async_udf.rs index 147e0ec42..5220a0d34 100644 --- a/crates/arroyo-worker/src/arrow/async_udf.rs +++ b/crates/arroyo-worker/src/arrow/async_udf.rs @@ -125,7 +125,7 @@ impl ArrowOperator for AsyncUdfOperator { "input_exprs", self.input_exprs .iter() - .map(|e| format!("{}", e)) + .map(|e| format!("{:?}", e)) .collect::>() .join(", ") .into(), @@ -134,7 +134,7 @@ impl ArrowOperator for AsyncUdfOperator { "final_exprs", self.final_exprs .iter() - .map(|e| format!("{}", e)) + .map(|e| format!("{:?}", e)) .collect::>() .join(", ") .into(), diff --git a/crates/arroyo-worker/src/lib.rs b/crates/arroyo-worker/src/lib.rs index a767652fa..f7cb684eb 100644 --- a/crates/arroyo-worker/src/lib.rs +++ b/crates/arroyo-worker/src/lib.rs @@ -412,7 +412,7 @@ impl WorkerGrpc for WorkerServer { let logical = LogicalProgram::try_from(req.program.expect("Program is None")) .expect("Failed to create LogicalProgram"); - if let Ok(v) = to_d2(&logical) { + if let Ok(v) = to_d2(&logical).await { debug!("Starting execution for graph\n{}", v); } diff --git a/crates/arroyo-worker/src/utils.rs b/crates/arroyo-worker/src/utils.rs index b615e12d4..b8ceb1fb9 100644 --- a/crates/arroyo-worker/src/utils.rs +++ b/crates/arroyo-worker/src/utils.rs @@ -1,5 +1,4 @@ use crate::engine::construct_operator; -use anyhow::bail; use arrow_schema::Schema; use arroyo_datastream::logical::LogicalProgram; use arroyo_df::physical::new_registry; @@ -14,15 +13,19 @@ fn format_arrow_schema_fields(schema: &Schema) -> Vec<(String, String)> { .collect() } -pub fn to_d2(logical: &LogicalProgram) -> anyhow::Result { - let registry = Arc::new(new_registry()); +pub async fn to_d2(logical: &LogicalProgram) -> anyhow::Result { + let mut registry = new_registry(); - if !logical.program_config.udf_dylibs.is_empty() - || !logical.program_config.python_udfs.is_empty() - { - bail!("UDFs are not yet supported in the pipeline visualizer"); + for (name, udf) in &logical.program_config.udf_dylibs { + registry.load_dylib(name, udf).await?; } + for udf in logical.program_config.python_udfs.values() { + registry.add_python_udf(udf).await?; + } + + let registry = Arc::new(registry); + let mut d2 = String::new(); for idx in logical.graph.node_indices() { diff --git a/crates/arroyo/src/main.rs b/crates/arroyo/src/main.rs index bbab8b013..e466bb4c3 100644 --- a/crates/arroyo/src/main.rs +++ b/crates/arroyo/src/main.rs @@ -482,9 +482,11 @@ async fn visualize(query: Input, open: bool) { .await .expect("Failed while planning query"); + let d2 = utils::to_d2(&compiled.program).await.unwrap(); + if open { let tmp = temp_dir().join("plan.d2"); - tokio::fs::write(&tmp, utils::to_d2(&compiled.program).unwrap()) + tokio::fs::write(&tmp, d2) .await .expect("Failed to write plan"); let output = tmp.with_extension("svg"); @@ -506,6 +508,6 @@ async fn visualize(query: Input, open: bool) { let _ = open::that(format!("file://{}", output.to_string_lossy())); } else { - println!("{}", utils::to_d2(&compiled.program).unwrap()); + println!("{}", d2); } }