diff --git a/src/syncer/mod.rs b/src/syncer/mod.rs index 6f2ca0f..2f8cb32 100644 --- a/src/syncer/mod.rs +++ b/src/syncer/mod.rs @@ -104,6 +104,20 @@ impl Syncer { } pub(crate) async fn run(self: &Arc, manifest: CsvManifest) -> Result<(), MultiError> { + self.spawwn_cltrc_listener(); + let fspecs = self.sort_csvs_by_first_line(manifest.files).await?; + tracing::trace!(path = %self.outdir.display(), "Creating root output directory"); + fs_err::create_dir_all(&self.outdir).map_err(|e| MultiError(vec![e.into()]))?; + let (nursery, nursery_stream) = Nursery::new(); + self.spawn_inventory_task(&nursery, fspecs); + self.spawn_object_tasks(&nursery); + drop(nursery); + let r = self.await_nursery(nursery_stream).await; + self.filterlog.finish(); + r + } + + fn spawwn_cltrc_listener(self: &Arc) { tokio::spawn({ let this = self.clone(); async move { @@ -114,12 +128,13 @@ impl Syncer { } } }); + } - let fspecs = self.sort_csvs_by_first_line(manifest.files).await?; - - tracing::trace!(path = %self.outdir.display(), "Creating root output directory"); - fs_err::create_dir_all(&self.outdir).map_err(|e| MultiError(vec![e.into()]))?; - let (nursery, nursery_stream) = Nursery::new(); + fn spawn_inventory_task( + self: &Arc, + nursery: &Nursery>, + fspecs: Vec, + ) { let obj_sender = { let guard = self .obj_sender @@ -130,9 +145,7 @@ impl Syncer { .cloned() .expect("obj_sender should not be None") }; - let this = self.clone(); - let sender = obj_sender.clone(); let subnursery = nursery.clone(); nursery.spawn( self.until_cancelled_ok(async move { @@ -159,7 +172,7 @@ impl Syncer { } else { None }; - if sender.send((item, notify)).await.is_err() { + if obj_sender.send((item, notify)).await.is_err() { // Assume we're shutting down return Ok(()); } @@ -178,7 +191,6 @@ impl Syncer { Ok(()) }) ); - drop(obj_sender); { let mut guard = self .obj_sender @@ -186,7 +198,9 @@ impl Syncer { .expect("obj_sender mutex should not be poisoned"); *guard = None; } + } + fn spawn_object_tasks(self: &Arc, nursery: &Nursery>) { for _ in 0..self.jobs.get() { let this = self.clone(); let recv = self.obj_receiver.clone(); @@ -204,11 +218,6 @@ impl Syncer { Ok(()) }); } - - drop(nursery); - let r = self.await_nursery(nursery_stream).await; - self.filterlog.finish(); - r } /// Fetch the first line of each inventory list file in `specs` and sort