-
Notifications
You must be signed in to change notification settings - Fork 0
Feat: Upload service #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
65fbfc2
refactor service module
9fddab5
file upload service
cb5e28d
fix service test
cd36ebd
add tests
1a1f258
abort on error
3d6730e
add docstrings
b5cda04
more docstrings
e7ffaf3
improve docstrings
f98126d
catch rate limit exceeded
6668899
add docs
0a2dbe3
Update docs/reference/service.md
jens-kuerten 540177d
Update docs/reference/service.md
jens-kuerten 8feb29f
update dev1
4090d98
Add Forbidden exception handling and update documentation for access …
a75fbc3
Update docs/reference/service.md
jens-kuerten 0461def
Update docs/reference/service.md
jens-kuerten File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from csfunctions.metadata import MetaData | ||
from csfunctions.service.base import Conflict, NotFound, Unauthorized, UnprocessableEntity | ||
from csfunctions.service.file_upload import FileUploadService | ||
from csfunctions.service.numgen import NumberGeneratorService | ||
|
||
__all__ = [ | ||
"Service", | ||
"FileUploadService", | ||
"NumberGeneratorService", | ||
"Conflict", | ||
"NotFound", | ||
"Unauthorized", | ||
"UnprocessableEntity", | ||
] | ||
|
||
|
||
class Service: | ||
""" | ||
Provides access to services on the elements instance, e.g. generating numbers. | ||
""" | ||
|
||
def __init__(self, metadata: MetaData): | ||
self.generator = NumberGeneratorService(metadata=metadata) | ||
self.file_upload = FileUploadService(metadata=metadata) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
from typing import Optional | ||
|
||
import requests | ||
|
||
from csfunctions.metadata import MetaData | ||
|
||
|
||
class Unauthorized(Exception): | ||
pass | ||
|
||
|
||
class Forbidden(Exception): | ||
pass | ||
|
||
|
||
class Conflict(Exception): | ||
pass | ||
|
||
|
||
class NotFound(Exception): | ||
pass | ||
|
||
|
||
class UnprocessableEntity(Exception): | ||
pass | ||
|
||
|
||
class RateLimitExceeded(Exception): | ||
pass | ||
|
||
|
||
class BaseService: | ||
""" | ||
Base class for services. | ||
""" | ||
|
||
def __init__(self, metadata: MetaData): | ||
# Store full metadata for services that need additional fields (e.g. app_user) | ||
self.metadata = metadata | ||
|
||
def request( | ||
self, endpoint: str, method: str = "GET", params: Optional[dict] = None, json: Optional[dict] = None | ||
) -> dict | list: | ||
""" | ||
Make a request to the access service. | ||
""" | ||
if self.metadata.service_url is None: | ||
raise ValueError("No service url given.") | ||
if self.metadata.service_token is None: | ||
raise ValueError("No service token given.") | ||
|
||
headers = {"Authorization": f"Bearer {self.metadata.service_token}"} | ||
params = params or {} | ||
url = str(self.metadata.service_url).rstrip("/") + "/" + endpoint.lstrip("/") | ||
response = requests.request(method, url=url, params=params, headers=headers, timeout=10, json=json) | ||
|
||
if response.status_code == 401: | ||
raise Unauthorized | ||
if response.status_code == 403: | ||
raise Forbidden | ||
elif response.status_code == 409: | ||
raise Conflict | ||
elif response.status_code == 404: | ||
raise NotFound | ||
elif response.status_code == 422: | ||
raise UnprocessableEntity(response.text) | ||
elif response.status_code == 429: | ||
raise RateLimitExceeded(response.text) | ||
if response.status_code == 200: | ||
return response.json() | ||
else: | ||
raise ValueError(f"Access service responded with status code {response.status_code}.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
import hashlib | ||
from copy import deepcopy | ||
from random import choice | ||
from string import ascii_letters | ||
from typing import BinaryIO | ||
|
||
import requests | ||
|
||
from csfunctions.service.base import BaseService | ||
from csfunctions.service.file_upload_schemas import ( | ||
AbortFileUploadRequest, | ||
CompleteFileUploadRequest, | ||
CreateNewFileRequest, | ||
CreateNewFileResponse, | ||
GeneratePresignedUrlRequest, | ||
PresignedWriteUrls, | ||
) | ||
|
||
|
||
def _generate_lock_id(): | ||
"""Generate a random 12-character lock ID.""" | ||
return "".join(choice(ascii_letters) for i in range(12)) # nosec | ||
|
||
|
||
class FileUploadService(BaseService): | ||
def _create_new_file(self, filename: str, parent_object_id: str, persno: str, check_access: bool = True) -> str: | ||
"""Create a new empty file attached to the parent object.""" | ||
response_json = self.request( | ||
endpoint="/file_upload/create", | ||
method="POST", | ||
json=CreateNewFileRequest( | ||
parent_object_id=parent_object_id, filename=filename, persno=persno, check_access=check_access | ||
).model_dump(), | ||
) | ||
data = CreateNewFileResponse.model_validate(response_json) | ||
return data.file_object_id | ||
|
||
def _get_presigned_write_urls( | ||
self, file_object_id: str, filesize: int, lock_id: str, persno: str, check_access: bool = True | ||
) -> PresignedWriteUrls: | ||
"""Request presigned URLs for uploading file chunks.""" | ||
response_json = self.request( | ||
endpoint=f"/file_upload/{file_object_id}/generate_presigned_url", | ||
method="POST", | ||
json=GeneratePresignedUrlRequest( | ||
check_access=check_access, persno=persno, filesize=filesize, lock_id=lock_id | ||
).model_dump(), | ||
) | ||
|
||
return PresignedWriteUrls.model_validate(response_json) | ||
|
||
def _upload_from_stream( | ||
self, presigned_urls: PresignedWriteUrls, stream: BinaryIO | ||
) -> tuple[PresignedWriteUrls, str]: | ||
"""Upload file stream in chunks and return updated presigned URLs and sha256 hash.""" | ||
etags: list[str] = [] | ||
sha256 = hashlib.sha256() | ||
for url in presigned_urls.urls: | ||
data: bytes = stream.read(presigned_urls.chunksize) | ||
sha256.update(data) | ||
resp = requests.put(url, data=data, headers=presigned_urls.headers, timeout=20) | ||
# 20 second timeout to stay below 30s max execution time of the Function | ||
# otherwise we won't get a proper error message in the logs | ||
resp.raise_for_status() | ||
etag = resp.headers.get("ETag") | ||
if etag: | ||
etags.append(etag) | ||
updated = deepcopy(presigned_urls) | ||
if etags: | ||
updated.etags = etags | ||
return updated, sha256.hexdigest() | ||
|
||
@staticmethod | ||
def _get_stream_size(stream: BinaryIO) -> int: | ||
"""Get the size of a seekable stream.""" | ||
if not stream.seekable(): | ||
raise ValueError("Stream is not seekable; size cannot be determined.") | ||
current_pos = stream.tell() | ||
stream.seek(0, 2) | ||
size = stream.tell() | ||
stream.seek(current_pos) | ||
return size | ||
|
||
def _complete_upload( | ||
self, | ||
file_object_id: str, | ||
filesize: int, | ||
lock_id: str, | ||
presigned_urls: PresignedWriteUrls, | ||
persno: str, | ||
check_access: bool = True, | ||
sha256: str | None = None, | ||
delete_derived_files: bool = True, | ||
) -> None: | ||
"""Mark the upload as complete and finalize the file.""" | ||
self.request( | ||
endpoint=f"/file_upload/{file_object_id}/complete", | ||
method="POST", | ||
json=CompleteFileUploadRequest( | ||
filesize=filesize, | ||
check_access=check_access, | ||
persno=persno, | ||
presigned_write_urls=presigned_urls, | ||
lock_id=lock_id, | ||
sha256=sha256, | ||
delete_derived_files=delete_derived_files, | ||
).model_dump(), | ||
) | ||
|
||
def _abort_upload( | ||
self, file_object_id: str, lock_id: str, persno: str, presigned_write_urls: PresignedWriteUrls | ||
) -> None: | ||
"""Abort an ongoing file upload.""" | ||
self.request( | ||
endpoint=f"/file_upload/{file_object_id}/abort", | ||
method="POST", | ||
json=AbortFileUploadRequest( | ||
lock_id=lock_id, | ||
persno=persno, | ||
presigned_write_urls=presigned_write_urls, | ||
).model_dump(), | ||
) | ||
|
||
def upload_file_content( | ||
self, | ||
file_object_id: str, | ||
stream: BinaryIO, | ||
persno: str | None = None, | ||
check_access: bool = True, | ||
filesize: int | None = None, | ||
delete_derived_files: bool = True, | ||
) -> None: | ||
""" | ||
Uploads content to an existing file object in chunks using presigned URLs. | ||
Handles aborting the upload if an error occurs. | ||
|
||
Args: | ||
file_object_id: The ID of the file object to upload to. | ||
stream: A binary stream containing the file data. | ||
persno: The user/person number who is uploading the file (default is user that triggered the Function). | ||
check_access: Whether to check access permissions. | ||
filesize: Size of the file in bytes (required only if the stream is not seekable). | ||
delete_derived_files: Whether to delete derived files after upload. | ||
|
||
Raises: | ||
csfunctions.service.Forbidden: If access check fails. | ||
csfunctions.service.Unauthorized: If the service token is invalid. | ||
csfunctions.service.Conflict: If the file is already locked. | ||
csfunctions.service.NotFound: If the file object does not exist. | ||
csfunctions.service.RateLimitExceeded: If the services rate limit is exceeded. | ||
""" | ||
persno = persno or self.metadata.app_user | ||
if filesize is None: | ||
filesize = self._get_stream_size(stream) | ||
lock_id = _generate_lock_id() | ||
presigned = self._get_presigned_write_urls( | ||
file_object_id=file_object_id, | ||
filesize=filesize, | ||
lock_id=lock_id, | ||
persno=persno, | ||
check_access=check_access, | ||
) | ||
try: | ||
presigned_with_etags, sha256 = self._upload_from_stream(presigned_urls=presigned, stream=stream) | ||
self._complete_upload( | ||
file_object_id=file_object_id, | ||
filesize=filesize, | ||
lock_id=lock_id, | ||
presigned_urls=presigned_with_etags, | ||
persno=persno, | ||
check_access=check_access, | ||
sha256=sha256, | ||
delete_derived_files=delete_derived_files, | ||
) | ||
except Exception as e: | ||
jens-kuerten marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# if something goes wrong during upload we try to abort | ||
self._abort_upload( | ||
file_object_id=file_object_id, | ||
lock_id=lock_id, | ||
persno=persno, | ||
presigned_write_urls=presigned, | ||
) | ||
raise e | ||
|
||
def upload_new_file( | ||
self, | ||
parent_object_id: str, | ||
filename: str, | ||
stream: BinaryIO, | ||
persno: str | None = None, | ||
check_access: bool = True, | ||
filesize: int | None = None, | ||
) -> str: | ||
""" | ||
Creates a new file attached to the parent object and uploads content from the provided stream. | ||
|
||
Args: | ||
parent_object_id: The ID of the parent object to attach the file to. | ||
filename: The name of the new file. | ||
stream: A binary stream containing the file data. | ||
persno: The user/person number who is uploading the file (default is user that triggered the Function). | ||
check_access: Whether to check access permissions. | ||
filesize: Size of the file in bytes (required only if the stream is not seekable). | ||
|
||
Returns: | ||
The ID of the newly created file object. | ||
|
||
Raises: | ||
csfunctions.service.Forbidden: If access check fails. | ||
csfunctions.service.Unauthorized: If the service token is invalid. | ||
csfunctions.service.NotFound: If the parent object does not exist. | ||
csfunctions.service.RateLimitExceeded: If the services rate limit is exceeded. | ||
""" | ||
persno = persno or self.metadata.app_user | ||
file_object_id = self._create_new_file( | ||
filename=filename, | ||
parent_object_id=parent_object_id, | ||
persno=persno, | ||
check_access=check_access, | ||
) | ||
self.upload_file_content( | ||
file_object_id=file_object_id, | ||
stream=stream, | ||
persno=persno, | ||
check_access=check_access, | ||
filesize=filesize, | ||
delete_derived_files=False, | ||
) | ||
return file_object_id |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.