|
| 1 | +# SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model project <dynamic.grid.calculation@alliander.com> |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: MPL-2.0 |
| 4 | +""" |
| 5 | +Helper functions to download (and store) files from the internet |
| 6 | +
|
| 7 | +The most simple (and intended) usage is: |
| 8 | +url = "http://141.51.193.167/simbench/gui/usecase/download/?simbench_code=1-complete_data-mixed-all-0-sw&format=csv" |
| 9 | +zip_file_path = download(url) |
| 10 | +
|
| 11 | +It will download the zip file 1-complete_data-mixed-all-0-sw.zip to a folder in you systems temp dir; for example |
| 12 | +"/tmp/1-complete_data-mixed-all-0-sw.zip". |
| 13 | +
|
| 14 | +Another convenience function is download_and_extract(): |
| 15 | +
|
| 16 | +csv_dir_path = download_and_extract(url) |
| 17 | +
|
| 18 | +This downloads the zip file as described above, and then it extracts the files there as well, in a folder which |
| 19 | +corresponds to the zip file name ("/tmp/1-complete_data-mixed-all-0-sw/" in our example), and it returns the path to |
| 20 | +that directory. By default, it will not re-download or re-extract the zip file as long as the files exist in your |
| 21 | +temp dir. Your temp dir is typically emptied whe you reboot your computer. |
| 22 | +
|
| 23 | +""" |
| 24 | + |
| 25 | +import base64 |
| 26 | +import hashlib |
| 27 | +import re |
| 28 | +import tempfile |
| 29 | +from dataclasses import dataclass |
| 30 | +from pathlib import Path |
| 31 | +from shutil import rmtree as remove_dir |
| 32 | +from typing import Optional, Union |
| 33 | +from urllib import request |
| 34 | + |
| 35 | +import structlog |
| 36 | +from tqdm import tqdm |
| 37 | + |
| 38 | +from power_grid_model_io.utils.zip import extract |
| 39 | + |
| 40 | +_log = structlog.get_logger(__name__) |
| 41 | + |
| 42 | + |
| 43 | +@dataclass |
| 44 | +class ResponseInfo: |
| 45 | + """ |
| 46 | + Struct to store response information extracted from the response header |
| 47 | + """ |
| 48 | + |
| 49 | + status: int |
| 50 | + file_name: Optional[str] = None |
| 51 | + file_size: Optional[int] = None |
| 52 | + |
| 53 | + |
| 54 | +class DownloadProgressHook: # pylint: disable=too-few-public-methods |
| 55 | + """ |
| 56 | + Report hook for request.urlretrieve() to update a progress bar based on the amount of downloaded blocks |
| 57 | + """ |
| 58 | + |
| 59 | + def __init__(self, progress_bar: tqdm): |
| 60 | + """ |
| 61 | + Report hook for request.urlretrieve() to update a progress bar based on the amount of downloaded blocks |
| 62 | +
|
| 63 | + Args: |
| 64 | + progress_bar: A tqdb progress bar |
| 65 | + """ |
| 66 | + self._progress_bar = progress_bar |
| 67 | + self._last_block = 0 |
| 68 | + |
| 69 | + def __call__(self, block_num: int, block_size: int, file_size: int) -> None: |
| 70 | + """ |
| 71 | + Args: |
| 72 | + block_num: The last downloaded block number |
| 73 | + block_size: The block size in bytes |
| 74 | + file_size: The file size in bytes (may be 0 in the first call) |
| 75 | +
|
| 76 | + """ |
| 77 | + if file_size > 0: |
| 78 | + self._progress_bar.total = file_size |
| 79 | + self._progress_bar.update((block_num - self._last_block) * block_size) |
| 80 | + self._last_block = block_num |
| 81 | + |
| 82 | + |
| 83 | +def download_and_extract( |
| 84 | + url: str, dir_path: Optional[Path] = None, file_name: Optional[Union[str, Path]] = None, overwrite: bool = False |
| 85 | +) -> Path: |
| 86 | + """ |
| 87 | + Download a file from a URL and store it locally, extract the contents and return the path to the contents. |
| 88 | +
|
| 89 | + Args: |
| 90 | + url: The url to the .zip file |
| 91 | + dir_path: An optional dir path to store the downloaded file. If no dir_path is given the current working dir |
| 92 | + will be used. |
| 93 | + file_name: An optional file name (or path relative to dir_path). If no file_name is given, a file name is |
| 94 | + generated based on the url |
| 95 | + overwrite: Should we download the file, even if we have downloaded already (and the file size still matches)? |
| 96 | + Be careful with this option, as it will remove files from your drive irreversibly! |
| 97 | +
|
| 98 | + Returns: |
| 99 | + The path to the downloaded file |
| 100 | + """ |
| 101 | + |
| 102 | + # Download the file and use the file name as the base name for the extraction directory |
| 103 | + src_file_path = download(url=url, file_name=file_name, dir_path=dir_path, overwrite=overwrite) |
| 104 | + dst_dir_path = src_file_path.with_suffix("") |
| 105 | + |
| 106 | + # If we explicitly want to overwrite the extracted files, remove the |
| 107 | + if overwrite and dst_dir_path.is_dir(): |
| 108 | + remove_dir(dst_dir_path) |
| 109 | + |
| 110 | + # Extract the files and return the path of the extraction directory |
| 111 | + return extract(src_file_path=src_file_path, dst_dir_path=dst_dir_path, skip_if_exists=not overwrite) |
| 112 | + |
| 113 | + |
| 114 | +def download( |
| 115 | + url: str, file_name: Optional[Union[str, Path]] = None, dir_path: Optional[Path] = None, overwrite: bool = False |
| 116 | +) -> Path: |
| 117 | + """ |
| 118 | + Download a file from a URL and store it locally |
| 119 | +
|
| 120 | + Args: |
| 121 | + url: The url to the file |
| 122 | + file_name: An optional file name (or path relative to dir_path). If no file_name is given, a file name is |
| 123 | + generated based on the url |
| 124 | + dir_path: An optional dir path to store the downloaded file. If no dir_path is given the current working dir |
| 125 | + will be used. |
| 126 | + overwrite: Should we download the file, even if we have downloaded already (and the file size still matches)? |
| 127 | +
|
| 128 | + Returns: |
| 129 | + The path to the downloaded file |
| 130 | + """ |
| 131 | + |
| 132 | + # get the response info, if the status is not 200 |
| 133 | + info = get_response_info(url=url) |
| 134 | + if info.status != 200: |
| 135 | + raise IOError(f"Could not download from URL, status={info.status}") |
| 136 | + |
| 137 | + if file_name is None and info.file_name: |
| 138 | + file_name = info.file_name |
| 139 | + |
| 140 | + file_path = get_download_path(dir_path=dir_path, file_name=file_name, unique_key=url) |
| 141 | + log = _log.bind(url=url, file_path=file_path) |
| 142 | + |
| 143 | + if file_path.is_file(): |
| 144 | + if overwrite: |
| 145 | + log.debug("Forced re-downloading existing file") |
| 146 | + # Don't remove the existing file just yet... Let's first see if we can download a new version. |
| 147 | + else: |
| 148 | + local_size = file_path.stat().st_size |
| 149 | + if local_size == info.file_size: |
| 150 | + log.debug("Skip downloading existing file") |
| 151 | + return file_path |
| 152 | + log.debug( |
| 153 | + "Re-downloading existing file, because the size has changed", |
| 154 | + local_size=local_size, |
| 155 | + remote_size=info.file_size, |
| 156 | + ) |
| 157 | + else: |
| 158 | + log.debug("Downloading file") |
| 159 | + |
| 160 | + # Download to a temp file first, so the results are not stored if the transfer fails |
| 161 | + with tqdm(desc="Downloading", unit="B", unit_scale=True, leave=True) as progress_bar: |
| 162 | + report_hook = DownloadProgressHook(progress_bar) |
| 163 | + temp_file, _headers = request.urlretrieve(url, reporthook=report_hook) |
| 164 | + |
| 165 | + # Check if the file contains any content |
| 166 | + temp_path = Path(temp_file) |
| 167 | + if temp_path.stat().st_size == 0: |
| 168 | + log.warning("Downloaded an empty file") |
| 169 | + |
| 170 | + # Remove the file, if it already exists |
| 171 | + file_path.unlink(missing_ok=True) |
| 172 | + |
| 173 | + # Move the file to it's final destination |
| 174 | + file_path.parent.mkdir(parents=True, exist_ok=True) |
| 175 | + temp_path.rename(file_path) |
| 176 | + log.debug("Downloaded file", file_size=file_path.stat().st_size) |
| 177 | + |
| 178 | + return file_path |
| 179 | + |
| 180 | + |
| 181 | +def get_response_info(url: str) -> ResponseInfo: |
| 182 | + """ |
| 183 | + Retrieve the file size of a given URL (based on it's header) |
| 184 | +
|
| 185 | + Args: |
| 186 | + url: The url to the file |
| 187 | +
|
| 188 | + Return: |
| 189 | + The file size in bytes |
| 190 | + """ |
| 191 | + with request.urlopen(url) as context: |
| 192 | + status = context.status |
| 193 | + headers = context.headers |
| 194 | + file_size = int(headers["Content-Length"]) if "Content-Length" in headers else None |
| 195 | + matches = re.findall(r"filename=\"(.+)\"", headers.get("Content-Disposition", "")) |
| 196 | + file_name = matches[0] if matches else None |
| 197 | + |
| 198 | + return ResponseInfo(status=status, file_size=file_size, file_name=file_name) |
| 199 | + |
| 200 | + |
| 201 | +def get_download_path( |
| 202 | + dir_path: Optional[Path] = None, |
| 203 | + file_name: Optional[Union[str, Path]] = None, |
| 204 | + unique_key: Optional[str] = None, |
| 205 | +) -> Path: |
| 206 | + """ |
| 207 | + Determine the file path based on dir_path, file_name and/or data |
| 208 | +
|
| 209 | + Args: |
| 210 | + dir_path: An optional dir path to store the downloaded file. If no dir_path is given the system's temp dir |
| 211 | + will be used. If omitted, the tempfolder is used. |
| 212 | + file_name: An optional file name (or path relative to dir_path). If no file_name is given, a file name is |
| 213 | + generated based on the unique key (e.g. an url) |
| 214 | + unique_key: A unique string that can be used to generate a filename (e.g. a url). |
| 215 | + """ |
| 216 | + |
| 217 | + # If no file_name is given, generate a file name |
| 218 | + if file_name is None: |
| 219 | + if unique_key is None: |
| 220 | + raise ValueError("Supply data in order to auto generate a download path.") |
| 221 | + |
| 222 | + sha256 = hashlib.sha256() |
| 223 | + sha256.update(unique_key.encode()) |
| 224 | + hash_str = base64.b64encode(sha256.digest()).decode("ascii") |
| 225 | + hash_str = hash_str.replace("/", "_").replace("+", "-").rstrip("=") |
| 226 | + file_name = Path(f"{hash_str}.download") |
| 227 | + |
| 228 | + # If no dir_path is given, use the system's designated folder for temporary files |
| 229 | + elif dir_path is None: |
| 230 | + dir_path = Path(tempfile.gettempdir()) |
| 231 | + |
| 232 | + # Combine the two paths |
| 233 | + assert file_name is not None |
| 234 | + file_path = (dir_path / file_name) if dir_path else Path(file_name) |
| 235 | + |
| 236 | + # If the file_path exists, it should be a file (not a dir) |
| 237 | + if file_path.exists() and not file_path.is_file(): |
| 238 | + raise ValueError(f"Invalid file path: {file_path}") |
| 239 | + |
| 240 | + return file_path.resolve() |
0 commit comments