Skip to content

Commit

Permalink
Merge pull request #32 from dandi/fix-halt
Browse files Browse the repository at this point in the history
Fixes to program termination
  • Loading branch information
jwodder authored Nov 19, 2024
2 parents a742020 + 8a244f6 commit 536849d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
10 changes: 6 additions & 4 deletions src/asyncutil/lsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ impl<T: Send + 'static> LimitedShutdownGroup<T> {
}
}

pub(crate) fn close(&self) {
let mut s = self.sender.lock().unwrap_or_else(PoisonError::into_inner);
*s = None;
}

pub(crate) fn shutdown(&self) {
{
let mut s = self.sender.lock().unwrap_or_else(PoisonError::into_inner);
*s = None;
}
self.close();
self.semaphore.close();
self.token.cancel();
}
Expand Down
47 changes: 33 additions & 14 deletions src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,45 +46,64 @@ impl Syncer {
for fspec in manifest.files {
let clnt = self.client.clone();
let sender = obj_sender.clone();
inventory_dl_pool.spawn(move |_| async move {
let itemlist = clnt.download_inventory_csv(fspec).await?;
for item in itemlist {
let _ = sender.send(item?).await;
}
Ok(())
inventory_dl_pool.spawn(move |token| async move {
token
.run_until_cancelled(async move {
let itemlist = clnt.download_inventory_csv(fspec).await?;
for item in itemlist {
if sender.send(item?).await.is_err() {
// Assume we're shutting down
return Ok(());
}
}
Ok(())
})
.await
.unwrap_or(Ok(()))
});
}
inventory_dl_pool.close();
drop(obj_sender);

let mut errors = Vec::new();
let mut inventory_pool_closed = false;
let mut object_pool_closed = false;
let mut inventory_pool_finished = false;
let mut object_pool_finished = false;
let mut all_objects_txed = false;
loop {
tokio::select! {
r = inventory_dl_pool.next(), if !inventory_pool_closed => {
r = inventory_dl_pool.next(), if !inventory_pool_finished => {
match r {
Some(Ok(())) => (),
Some(Err(e)) => {
if !errors.is_empty() {
tracing::error!(error = ?e, "error processing inventory lists");
if errors.is_empty() {
inventory_dl_pool.shutdown();
object_dl_pool.shutdown();
}
errors.push(e);
}
None => inventory_pool_closed = true,
None => {
tracing::debug!("Finished processing inventory lists");
inventory_pool_finished = true;
object_dl_pool.close();
}
}
}
r = object_dl_pool.next(), if !object_pool_closed => {
r = object_dl_pool.next(), if !object_pool_finished => {
match r {
Some(Ok(())) => (),
Some(Err(e)) => {
if !errors.is_empty() {
tracing::error!(error = ?e, "error processing objects");
if errors.is_empty() {
inventory_dl_pool.shutdown();
object_dl_pool.shutdown();
}
errors.push(e);
}
None => object_pool_closed = true,
None => {
tracing::debug!("Finished processing objects");
object_pool_finished = true;
}
}
}
r = obj_receiver.recv(), if !all_objects_txed => {
Expand Down

0 comments on commit 536849d

Please sign in to comment.