Skip to content

Commit

Permalink
Do not update on progress more frequently than once in two seconds pe…
Browse files Browse the repository at this point in the history
…r file or when done

This is primarily to resolve the problem of pyout "dragging us behind" which
slows down entire download process.

#1549
  • Loading branch information
yarikoptic committed Dec 16, 2024
1 parent dc938f6 commit a0ba0cb
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,37 @@ def download_generator(self) -> Iterator[dict]:
lock=lock,
)

def _progress_filter(gen):
"""To reduce load on pyout etc, make progress reports only if enough time
from prior report has passed (over 2 seconds) or we are done (got 100%).
Note that it requires "awareness" from the code below to issue other messages
with bundling with done% reporting if reporting on progress of some kind (e.g.,
adjusting "message").
"""
prior_time = 0
warned = False
for rec in gen:
current_time = time.time()
if done_perc := rec.get("done%", 0):
if isinstance(done_perc, (int, float)):
if current_time - prior_time < 2 and done_perc != 100:
continue
elif not warned:
warned = True
lgr.warning(
"Received non numeric done%%: %r", done_perc
)
prior_time = current_time
yield rec

# If exception is raised we might just raise it, or yield
# an error record
gen = {
"raise": _download_generator,
"yield": _download_generator_guard(path, _download_generator),
}[self.on_error]

gen = _progress_filter(gen)
if self.yield_generator_for_fields:
yield {"path": path, self.yield_generator_for_fields: gen}
else:
Expand Down Expand Up @@ -1247,9 +1271,9 @@ def feed(self, path: str, status: dict) -> Iterator[dict]:
self.files[path].downloaded = size
self.maxsize += size
self.set_status(out)
yield out
if self.zarr_size:
yield self.get_done()
out.update(self.get_done())
yield out
elif keys == ["size"]:
self.files[path].size = size
self.maxsize += status["size"]
Expand All @@ -1274,11 +1298,11 @@ def feed(self, path: str, status: dict) -> Iterator[dict]:
self.files[path].state = DLState.ERROR
out = {"message": self.message}
self.set_status(out)
yield out
sz = self.files[path].size
if sz is not None:
self.maxsize -= sz
yield self.get_done()
out.update(self.get_done())
yield out
elif keys == ["checksum"]:
pass
elif status == {"status": "setting mtime"}:
Expand All @@ -1287,6 +1311,7 @@ def feed(self, path: str, status: dict) -> Iterator[dict]:
self.files[path].state = DLState.DONE
out = {"message": self.message}
self.set_status(out)
out.update(self.get_done())
yield out
else:
lgr.warning(
Expand Down

0 comments on commit a0ba0cb

Please sign in to comment.