Skip to content

Commit

Permalink
Break up Syncer::run()
Browse files Browse the repository at this point in the history
  • Loading branch information
jwodder committed Jan 13, 2025
1 parent 1c08933 commit b3b1deb
Showing 1 changed file with 23 additions and 14 deletions.
37 changes: 23 additions & 14 deletions src/syncer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ impl Syncer {
}

pub(crate) async fn run(self: &Arc<Self>, manifest: CsvManifest) -> Result<(), MultiError> {
self.spawwn_cltrc_listener();
let fspecs = self.sort_csvs_by_first_line(manifest.files).await?;
tracing::trace!(path = %self.outdir.display(), "Creating root output directory");
fs_err::create_dir_all(&self.outdir).map_err(|e| MultiError(vec![e.into()]))?;
let (nursery, nursery_stream) = Nursery::new();
self.spawn_inventory_task(&nursery, fspecs);
self.spawn_object_tasks(&nursery);
drop(nursery);
let r = self.await_nursery(nursery_stream).await;
self.filterlog.finish();
r
}

Check warning on line 118 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L107-L118

Added lines #L107 - L118 were not covered by tests

fn spawwn_cltrc_listener(self: &Arc<Self>) {

Check warning on line 120 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L120

Added line #L120 was not covered by tests
tokio::spawn({
let this = self.clone();
async move {
Expand All @@ -114,12 +128,13 @@ impl Syncer {
}
}
});
}

Check warning on line 131 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L131

Added line #L131 was not covered by tests

let fspecs = self.sort_csvs_by_first_line(manifest.files).await?;

tracing::trace!(path = %self.outdir.display(), "Creating root output directory");
fs_err::create_dir_all(&self.outdir).map_err(|e| MultiError(vec![e.into()]))?;
let (nursery, nursery_stream) = Nursery::new();
fn spawn_inventory_task(
self: &Arc<Self>,
nursery: &Nursery<anyhow::Result<()>>,
fspecs: Vec<FileSpec>,
) {

Check warning on line 137 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L133-L137

Added lines #L133 - L137 were not covered by tests
let obj_sender = {
let guard = self
.obj_sender
Expand All @@ -130,9 +145,7 @@ impl Syncer {
.cloned()
.expect("obj_sender should not be None")
};

let this = self.clone();
let sender = obj_sender.clone();
let subnursery = nursery.clone();
nursery.spawn(
self.until_cancelled_ok(async move {
Expand All @@ -159,7 +172,7 @@ impl Syncer {
} else {
None

Check warning on line 173 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L173

Added line #L173 was not covered by tests
};
if sender.send((item, notify)).await.is_err() {
if obj_sender.send((item, notify)).await.is_err() {

Check warning on line 175 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L175

Added line #L175 was not covered by tests
// Assume we're shutting down
return Ok(());

Check warning on line 177 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L177

Added line #L177 was not covered by tests
}
Expand All @@ -178,15 +191,16 @@ impl Syncer {
Ok(())
})
);

Check warning on line 193 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L192-L193

Added lines #L192 - L193 were not covered by tests
drop(obj_sender);
{
let mut guard = self
.obj_sender
.lock()
.expect("obj_sender mutex should not be poisoned");
*guard = None;
}
}

Check warning on line 201 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L201

Added line #L201 was not covered by tests

fn spawn_object_tasks(self: &Arc<Self>, nursery: &Nursery<anyhow::Result<()>>) {
for _ in 0..self.jobs.get() {

Check warning on line 204 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L203-L204

Added lines #L203 - L204 were not covered by tests
let this = self.clone();
let recv = self.obj_receiver.clone();
Expand All @@ -204,11 +218,6 @@ impl Syncer {
Ok(())
});
}

drop(nursery);
let r = self.await_nursery(nursery_stream).await;
self.filterlog.finish();
r
}

Check warning on line 221 in src/syncer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/syncer/mod.rs#L221

Added line #L221 was not covered by tests

/// Fetch the first line of each inventory list file in `specs` and sort
Expand Down

0 comments on commit b3b1deb

Please sign in to comment.