diff --git a/src/common/boto_client.py b/src/common/boto_client.py index 1a0a855..65d9dd2 100644 --- a/src/common/boto_client.py +++ b/src/common/boto_client.py @@ -48,7 +48,7 @@ def check_bucket_exists(bucket_name: str, botoclient: boto3.client = Depends(get try: botoclient.head_bucket(Bucket=bucket_name) - _log.warning(f"==> Bucket '{bucket_name}' already exists.") + _log.warning(f"==> Bucket '{bucket_name}' exists.") except (exceptions.ClientError, exceptions.BotoCoreError) as exc: error_message = exc.response.get("Error", {}).get("Message", "An error occurred") status_code = int(exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", status.HTTP_400_BAD_REQUEST)) @@ -56,13 +56,13 @@ def check_bucket_exists(bucket_name: str, botoclient: boto3.client = Depends(get if status_code == status.HTTP_404_NOT_FOUND: raise CustomHTTPException( error_code=SfsErrorCodes.SFS_INVALID_NAME, - error_message=error_message, + error_message=f"Bucket '{bucket_name}' does not exist. Please create it first. {error_message}", status_code=status.HTTP_404_NOT_FOUND, ) from exc elif status_code == status.HTTP_403_FORBIDDEN: raise CustomHTTPException( error_code=SfsErrorCodes.SFS_ACCESS_DENIED, - error_message=error_message, + error_message=f"Access denied to check the bucket '{bucket_name}'.", status_code=status.HTTP_403_FORBIDDEN, ) from exc else: diff --git a/src/common/error_codes.py b/src/common/error_codes.py index 0a86ecc..ecc9afb 100644 --- a/src/common/error_codes.py +++ b/src/common/error_codes.py @@ -3,7 +3,7 @@ class SfsErrorCodes(StrEnum): SFS_INVALID_KEY = "sfs/invalid-key" - SFS_INVALID_TAGS = "sfs/invalid-tags" + SFS_INVALID_TAGS_FORMAT = "sfs/invalid-tags-format" SFS_INVALID_DATA = "sfs/invalid-data" SFS_INVALID_NAME = "sfs/invalid-name" SFS_INVALID_FILE = "sfs/invalid-file" @@ -15,3 +15,4 @@ class SfsErrorCodes(StrEnum): REQUEST_VALIDATION_ERROR = "app/request-validation-error" SFS_BUCKET_NAME_ALREADY_EXIST = "sfs/bucket-name-alreay-exist" AUTH_ACCESS_DENIED = "app/service-access-denied" + SFS_FILE_NOT_FOUND = "sfs/file-not-found" diff --git a/src/common/functional.py b/src/common/functional.py index b5feab0..17777e6 100644 --- a/src/common/functional.py +++ b/src/common/functional.py @@ -81,7 +81,7 @@ def format_bucket(bucket_name: str) -> str: return formatted -def generate_media_name(extension: str) -> str: +async def generate_media_name(extension: str) -> str: """ Generates a unique media_router file name using the current timestamp, a UUID, and the provided file extension. diff --git a/src/models/media.py b/src/models/media.py index bfc1d8b..4ae522a 100644 --- a/src/models/media.py +++ b/src/models/media.py @@ -1,5 +1,8 @@ +from urllib.parse import urljoin + from beanie import Document -from pydantic import Field +from pydantic import computed_field, Field +from pymongo import ASCENDING, IndexModel from src.config import settings from src.schemas import MediaSchema @@ -11,4 +14,18 @@ class Media(Document, MediaSchema, DatetimeTimestamp): class Settings: name = settings.MEDIA_DB_COLLECTION.split(".")[1] - use_state_management = True + indexes = [ + IndexModel( + [("bucket_name", ASCENDING), ("name_in_minio", ASCENDING)], + unique=True, + name="bucket_name_name_in_minio_index", + ) + ] + + @computed_field + def media_url(self) -> str: + file_path = f"{self.bucket_name}/{self.name_in_minio}" + if self.is_public: + return urljoin(settings.STORAGE_BROWSER_REDIRECT_URL, f"/media/public/{file_path}") + else: + return urljoin(settings.STORAGE_BROWSER_REDIRECT_URL, f"/media/{file_path}") diff --git a/src/routers/media.py b/src/routers/media.py index c41a760..a8147da 100644 --- a/src/routers/media.py +++ b/src/routers/media.py @@ -1,22 +1,22 @@ import json -from typing import Optional from mimetypes import guess_type +from typing import Optional import boto3 from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, Query, status, UploadFile -from fastapi.responses import StreamingResponse +from fastapi.responses import JSONResponse, StreamingResponse from fastapi_pagination.async_paginator import paginate as async_paginate from pymongo import ASCENDING, DESCENDING from src.common.boto_client import check_bucket_exists, get_boto_client from src.common.error_codes import SfsErrorCodes -from src.common.permissions import CheckAccessAllow from src.common.exception import CustomHTTPException from src.common.functional import customize_page +from src.common.permissions import CheckAccessAllow from src.common.utils import SortEnum from src.models import Media from src.schemas import MediaFilter -from src.services import delete_media_if_exist_from_mongo, download_media, get_media, upload_media +from src.services import delete_media_if_exist_from_mongo, download_media, find_public_media, get_media, upload_media media_router: APIRouter = APIRouter( prefix="/media", @@ -32,8 +32,17 @@ summary="Upload a file to an S3 object.", status_code=status.HTTP_202_ACCEPTED, ) +@media_router.post( + "/_write", + response_model=Media, + summary="Upload a file to an S3 object (unsecured).", + status_code=status.HTTP_202_ACCEPTED, + include_in_schema=False, +) async def upload_file_to_buckect( bucket_name: str = Form(..., description="Bucket name to upload the file"), + is_public: bool = Form(False, description="If the file is public or not"), + ttl_minutes: Optional[int] = Form(None, description="Time to live in minutes for the file"), file: UploadFile = File(..., description="File to be uploaded"), tags: Optional[str] = Form( None, @@ -44,14 +53,21 @@ async def upload_file_to_buckect( ): try: tags_dict = json.loads(tags) if tags else {} - except json.JSONDecodeError as exc: + except (json.JSONDecodeError, Exception) as exc: raise CustomHTTPException( - error_code=SfsErrorCodes.SFS_INVALID_TAGS, - error_message="Invalid JSON string for tags.", + error_code=SfsErrorCodes.SFS_INVALID_TAGS_FORMAT, + error_message="Tags should be like: \"{'key': 'value'}\" dumped. \n Error: " f"{str(exc)}", status_code=status.HTTP_400_BAD_REQUEST, ) from exc - result = await upload_media(botoclient=botoclient, bucket_name=bucket_name, tags=tags_dict, file=file) + result = await upload_media( + botoclient=botoclient, + bucket_name=bucket_name, + tags=tags_dict, + file=file, + is_public=is_public, + ttl_minutes=ttl_minutes, + ) return result @@ -71,13 +87,17 @@ async def list_media( if query.bucket_name: check_bucket_exists(bucket_name=query.bucket_name, botoclient=botoclient) search.update({"bucket_name": {"$regex": query.bucket_name, "$options": "i"}}) + if query.filename: + search.update({"name_in_minio": {"$regex": query.filename, "$options": "i"}}) + if query.public: + search.update({"is_public": query.public}) if query.tags: del query["tags"] search.update({f"tags.{k}": v for k, v in query.tags.items()}) sorted = DESCENDING if sort == SortEnum.DESC else ASCENDING medias = await Media.find(search, sort=[("created_at", sorted)]).to_list() - media = [await get_media(filename=m.name_in_minio, bucket_name=m.bucket_name) for m in medias if m] + media = [await get_media(filename=media.name_in_minio, bucket_name=media.bucket_name) for media in medias if media] return await async_paginate(media) @@ -94,7 +114,7 @@ async def get_media_obj( bg: BackgroundTasks, bucket_name: str, filename: str, - download: bool = Query(default=False), + download: bool = Query(default=False, description="Download the file"), botoclient: boto3.client = Depends(get_boto_client), ): if download: @@ -123,7 +143,39 @@ async def get_media_obj( "/{bucket_name}/{filename}", dependencies=[Depends(CheckAccessAllow(permissions={"sfs:can-delete-file"}))], summary="Delete a file from a bucket", - status_code=status.HTTP_204_NO_CONTENT, + status_code=status.HTTP_200_OK, ) async def delete_file(bucket_name: str, filename: str, botoclient: boto3.client = Depends(get_boto_client)): await delete_media_if_exist_from_mongo(bucket_name=bucket_name, filename=filename, botoclient=botoclient) + return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "File deleted successfully."}) + + +@media_router.get( + "/public/{bucket_name}/{filename}", + summary="Retrieve public media", + status_code=status.HTTP_200_OK, +) +async def get_public_media(bucket_name: str, filename: str, botoclient: boto3.client = Depends(get_boto_client)): + items_found = await find_public_media(bucket_name=bucket_name, filename=filename) + + if not items_found: + raise CustomHTTPException( + error_code=SfsErrorCodes.SFS_FILE_NOT_FOUND, + error_message=f"File {filename} not found in bucket {bucket_name} or is not public or expirartion date exceeded.", + status_code=status.HTTP_404_NOT_FOUND, + ) + + # retrieve the media from MinIO and stream it + try: + media = await get_media(bucket_name=bucket_name, filename=filename, botoclient=botoclient) + except Exception as exc: + raise CustomHTTPException( + error_code=SfsErrorCodes.SFS_FILE_NOT_FOUND, + error_message=str(exc), + status_code=status.HTTP_404_NOT_FOUND, + ) from exc + + return StreamingResponse( + content=media, + media_type="application/octet-stream", + ) diff --git a/src/schemas/media.py b/src/schemas/media.py index 20a609b..63171fa 100644 --- a/src/schemas/media.py +++ b/src/schemas/media.py @@ -8,8 +8,12 @@ class MediaSchema(BaseModel): bucket_name: str = Field(..., description="Bucket name") name_in_minio: str = Field(..., description="Media object name in minio") tags: dict = Field(None, description="list of tags") + is_public: Optional[bool] = Field(False, description="Is media public") + ttl_minutes: Optional[int] = Field(None, description="Time to live in minutes") class MediaFilter(BaseModel): bucket_name: Optional[str] = Field(None, description="Bucket name") - tags: Optional[dict] = Field(None, description="Media tags") + filename: Optional[str] = Field(None, description="Media filename") + public: Optional[bool] = Field(None, description="Is media public") + tags: Optional[dict] = Field(None, description="Media tags", examples=['{"key":"value"}']) diff --git a/src/services/__init__.py b/src/services/__init__.py index 53b7f34..10c4459 100644 --- a/src/services/__init__.py +++ b/src/services/__init__.py @@ -1,2 +1,2 @@ from .bucket import create_new_bucket, get_or_create_bucket, delete_bucket # noqa: F401 -from .media import upload_media, get_media, delete_media_if_exist_from_mongo, download_media # noqa: F401 +from .media import upload_media, get_media, delete_media_if_exist_from_mongo, download_media, find_public_media # noqa: F401 diff --git a/src/services/media.py b/src/services/media.py index 0221ccc..08b60bf 100644 --- a/src/services/media.py +++ b/src/services/media.py @@ -1,5 +1,6 @@ import os import tempfile +from datetime import datetime from typing import Optional from urllib.parse import urljoin @@ -10,7 +11,7 @@ from typing_extensions import deprecated from urllib3 import BaseHTTPResponse, HTTPResponse -from src.common.boto_client import get_boto_client +from src.common.boto_client import get_boto_client, check_bucket_exists from src.common.error_codes import SfsErrorCodes from src.common.exception import CustomHTTPException from src.common.functional import format_bucket, generate_media_name, replace_minio_url_base @@ -26,6 +27,8 @@ def _upload_media_to_minio( bucket_name: str = Depends(format_bucket), botoclient: boto3.client = Depends(get_boto_client), ): + check_bucket_exists(bucket_name, botoclient=botoclient) + with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_file.write(file.file.read()) @@ -36,7 +39,6 @@ def _upload_media_to_minio( extra_args["Tagging"] = "&".join([f"{tag['Key']}:{tag['Value']}" for tag in tag_set]) response = botoclient.upload_file(Filename=temp_file.name, Bucket=bucket_name, Key=key, ExtraArgs=extra_args) - os.remove(temp_file.name) except (exceptions.ClientError, exceptions.BotoCoreError) as exc: error_message = exc.response.get("Error", {}).get("Message", "An error occurred") status_code = exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", status.HTTP_400_BAD_REQUEST) @@ -44,6 +46,8 @@ def _upload_media_to_minio( error_code=SfsErrorCodes.SFS_INVALID_NAME, error_message=error_message, status_code=status_code ) from exc + os.remove(temp_file.name) + return response @@ -100,13 +104,20 @@ async def upload_media( bucket_name: str, tags: Optional[dict] = None, file: UploadFile = File(...), + is_public: Optional[bool] = False, + ttl_minutes: Optional[int] = None, botoclient: boto3.client = Depends(get_boto_client), ): extension = file.filename.split(".")[-1] - media_name = generate_media_name(extension=extension) + media_name = await generate_media_name(extension=extension) media_schema = MediaSchema( - bucket_name=bucket_name, name_in_minio=media_name, tags=tags if tags else None, filename=media_name + bucket_name=bucket_name, + name_in_minio=media_name, + tags=tags if tags else None, + filename=media_name, + is_public=is_public, + ttl_minutes=ttl_minutes, ) media = await _save_media(media=media_schema, file=file, botoclient=botoclient) return media @@ -189,3 +200,24 @@ async def download_media( ) return response + + +async def find_public_media(bucket_name: str, filename: str): + pipeline = [ + {"$match": {"bucket_name": bucket_name, "name_in_minio": filename, "is_public": True}}, + { + "$addFields": { + "expiration_time": { + "$cond": { + "if": {"$ne": ["$ttl_minutes", None]}, + "then": {"$add": ["$updated_at", {"$multiply": ["$ttl_minutes", 60000]}]}, # 1mn -> ms + "else": None, + } + } + } + }, + {"$match": {"$or": [{"expiration_time": {"$gt": datetime.now()}}, {"expiration_time": None}]}}, + ] + + media_from_db = await Media.aggregate(pipeline).to_list() + return media_from_db diff --git a/tests/routers/test_media_api.py b/tests/routers/test_media_api.py index 4764522..835f568 100644 --- a/tests/routers/test_media_api.py +++ b/tests/routers/test_media_api.py @@ -91,8 +91,7 @@ async def test_upload_media_invalid_tags(http_client_api, default_bucket, fake_d headers={"Authorization": "Bearer token"}, ) assert response.status_code == status.HTTP_400_BAD_REQUEST, response.text - assert response.json()["error_code"] == SfsErrorCodes.SFS_INVALID_TAGS - assert response.json()["error_message"] == "Invalid JSON string for tags." + assert response.json()["error_code"] == SfsErrorCodes.SFS_INVALID_TAGS_FORMAT mock_check_access_allow.assert_called_once() @@ -122,8 +121,7 @@ async def test_upload_media_invalid_file(http_client_api, default_bucket, mock_c headers={"Authorization": "Bearer token"}, ) assert response.status_code == status.HTTP_400_BAD_REQUEST, response.text - assert response.json()["error_code"] == SfsErrorCodes.SFS_INVALID_TAGS - assert response.json()["error_message"] == "Invalid JSON string for tags." + assert response.json()["error_code"] == SfsErrorCodes.SFS_INVALID_TAGS_FORMAT mock_check_access_allow.assert_called_once() @@ -134,6 +132,7 @@ async def test_delete_media(http_client_api, default_media, mock_check_access_al bucket_name, filename = default_media.bucket_name, default_media.filename response = await http_client_api.delete(f"/media/{bucket_name}/{filename}", headers={"Authorization": "Bearer token"}) - assert response.status_code == status.HTTP_204_NO_CONTENT, response.text + assert response.status_code == status.HTTP_200_OK, response.text + assert response.json()["message"] == "File deleted successfully." mock_check_access_allow.assert_called_once()