Skip to content

Commit

Permalink
Merge pull request #1215 from dandi/archive-1497
Browse files Browse the repository at this point in the history
Update for fix to Zarr upload procedure
  • Loading branch information
jwodder authored Feb 22, 2023
2 parents 7e579fb + 8965356 commit 7fa28c8
Showing 1 changed file with 37 additions and 22 deletions.
59 changes: 37 additions & 22 deletions dandi/files/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def mkzarr() -> str:
to_upload = EntryUploadTracker()
if old_zarr_entries:
to_delete: list[RemoteZarrEntry] = []
digesting: list[Future[tuple[LocalZarrEntry, str, int]]] = []
digesting: list[Future[tuple[LocalZarrEntry, str, bool]]] = []
yield {"status": "comparing against remote Zarr"}
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
for local_entry in self.iterfiles():
Expand Down Expand Up @@ -444,9 +444,9 @@ def mkzarr() -> str:
chunked(upload_items, ZARR_UPLOAD_BATCH_SIZE), start=1
):
uploading = []
for (entry_path, digest, size) in items:
zcc.add_leaf(Path(entry_path), size, digest)
uploading.append(entry_path)
for it in items:
zcc.add_leaf(Path(it.entry_path), it.size, it.digest)
uploading.append(it.upload_request())
lgr.debug(
"%s: Uploading Zarr file batch #%d (%s)",
asset_path,
Expand All @@ -459,11 +459,10 @@ def mkzarr() -> str:
executor.submit(
_upload_zarr_file,
storage_session=storage,
path=self.filepath / entry_path,
upload_url=signed_url,
digest=digest,
item=it,
)
for (signed_url, (entry_path, digest, _)) in zip(r, items)
for (signed_url, it) in zip(r, items)
]
changed = True
for fut in as_completed(futures):
Expand Down Expand Up @@ -539,19 +538,17 @@ def mkzarr() -> str:


def _upload_zarr_file(
storage_session: RESTFullAPIClient, path: Path, upload_url: str, digest: str
storage_session: RESTFullAPIClient, upload_url: str, item: UploadItem
) -> int:
with path.open("rb") as fp:
with item.filepath.open("rb") as fp:
storage_session.put(
upload_url,
data=fp,
json_resp=False,
retry_if=_retry_zarr_file,
headers={
"Content-MD5": b64encode(bytes.fromhex(digest)).decode("us-ascii")
},
headers={"Content-MD5": item.base64_digest},
)
return path.stat().st_size
return item.size


def _retry_zarr_file(r: requests.Response) -> bool:
Expand All @@ -574,32 +571,29 @@ class EntryUploadTracker:
"""

total_size: int = 0
digested_entries: list[tuple[LocalZarrEntry, str, int]] = field(
default_factory=list
)
digested_entries: list[UploadItem] = field(default_factory=list)
fresh_entries: list[LocalZarrEntry] = field(default_factory=list)

def register(self, e: LocalZarrEntry, digest: Optional[str] = None) -> None:
if digest is not None:
self.digested_entries.append((e, digest, e.size))
self.digested_entries.append(UploadItem.from_entry(e, digest))
else:
self.fresh_entries.append(e)
self.total_size += e.size

@staticmethod
def _mkitem(e: LocalZarrEntry) -> tuple[str, str, int]:
def _mkitem(e: LocalZarrEntry) -> UploadItem:
digest = md5file_nocache(e.filepath)
return (str(e), digest, e.size)
return UploadItem.from_entry(e, digest)

def get_items(self, jobs: int = 5) -> Generator[tuple[str, str, int], None, None]:
def get_items(self, jobs: int = 5) -> Generator[UploadItem, None, None]:
# Note: In order for the ThreadPoolExecutor to be closed if an error
# occurs during upload, the method must be used like this:
#
# with contextlib.closing(to_upload.get_items()) as upload_items:
# for item in upload_items:
# ...
for e, digest, size in self.digested_entries:
yield (str(e), digest, size)
yield from self.digested_entries
if not self.fresh_entries:
return
with ThreadPoolExecutor(max_workers=jobs) as executor:
Expand All @@ -615,6 +609,27 @@ def get_items(self, jobs: int = 5) -> Generator[tuple[str, str, int], None, None
raise


@dataclass
class UploadItem:
""":meta private:"""

entry_path: str
filepath: Path
digest: str
size: int

@classmethod
def from_entry(cls, e: LocalZarrEntry, digest: str) -> UploadItem:
return cls(entry_path=str(e), filepath=e.filepath, digest=digest, size=e.size)

@property
def base64_digest(self) -> str:
return b64encode(bytes.fromhex(self.digest)).decode("us-ascii")

def upload_request(self) -> dict[str, str]:
return {"path": self.entry_path, "base64md5": self.base64_digest}


def _cmp_digests(
asset_path: str, local_entry: LocalZarrEntry, remote_digest: str
) -> tuple[LocalZarrEntry, str, bool]:
Expand Down

0 comments on commit 7fa28c8

Please sign in to comment.