Skip to content

Commit

Permalink
Merge pull request #310 from dandi/gh-309
Browse files Browse the repository at this point in the history
Set chunk size on per-file basis; limit to 1000 chunks; upload files up to 400GB ATM
  • Loading branch information
yarikoptic authored Dec 8, 2020
2 parents da3b7a7 + 67e0ad5 commit 0f589ef
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 8 deletions.
65 changes: 65 additions & 0 deletions dandi/girder.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,71 @@ def lock_dandiset(self, dandiset_identifier: str):
f"Failed to unlock dandiset {dandiset_identifier} due to: {msg}"
)

NGINX_MAX_CHUNK_SIZE = 400 * (1 << 20) # 400 MiB

def _uploadContents(self, uploadObj, stream, size, progressCallback=None):
"""
Uploads contents of a file. Overridden so that the chunk size can be
set on a per-file basis.
:param uploadObj: The upload object contain the upload id.
:type uploadObj: dict
:param stream: Readable stream object.
:type stream: file-like
:param size: The length of the file. This must be exactly equal to the
total number of bytes that will be read from ``stream``, otherwise
the upload will fail.
:type size: str
:param progressCallback: If passed, will be called after each chunk
with progress information. It passes a single positional argument
to the callable which is a dict of information about progress.
:type progressCallback: callable
"""
offset = 0
uploadId = uploadObj["_id"]

chunk_size = max(self.MAX_CHUNK_SIZE, (size + 999) // 1000)
if chunk_size > self.NGINX_MAX_CHUNK_SIZE:
raise Exception("File requires too many chunks to upload")

with self.progressReporterCls(
label=uploadObj.get("name", ""), length=size
) as reporter:

while True:
chunk = stream.read(min(chunk_size, (size - offset)))

if not chunk:
break

if isinstance(chunk, str):
chunk = chunk.encode("utf8")

uploadObj = self.post(
"file/chunk?offset=%d&uploadId=%s" % (offset, uploadId),
data=gcl._ProgressBytesIO(chunk, reporter=reporter),
)

if "_id" not in uploadObj:
raise Exception(
"After uploading a file chunk, did not receive object with _id. "
"Got instead: " + json.dumps(uploadObj)
)

offset += len(chunk)

if callable(progressCallback):
progressCallback({"current": offset, "total": size})

if offset != size:
self.delete("file/upload/" + uploadId)
raise gcl.IncorrectUploadLengthError(
"Expected upload to be %d bytes, but received %d." % (size, offset),
upload=uploadObj,
)

return uploadObj


def _harmonize_girder_dandiset_to_dandi_api(rec):
"""
Expand Down
8 changes: 0 additions & 8 deletions dandi/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,6 @@ def process_path(path, relpath):
try:
path_stat = path.stat()
yield {"size": path_stat.st_size}
size_cut_off = 67108864000
if path_stat.st_size > size_cut_off:
raise RuntimeError(
"Too large! We are experiencing problems uploading files larger than %s. "
"See https://github.com/dandi/dandiarchive/issues/517 and update "
"client when the issue is resolved."
% (naturalsize(size_cut_off))
)
except FileNotFoundError:
yield skip_file("ERROR: File not found")
return
Expand Down

0 comments on commit 0f589ef

Please sign in to comment.