diff --git a/gutenberg/acquire/text.py b/gutenberg/acquire/text.py index c9ab11e..17886b4 100644 --- a/gutenberg/acquire/text.py +++ b/gutenberg/acquire/text.py @@ -52,10 +52,14 @@ def _check_mirror_exists(mirror): .format(mirror)) -def _format_download_uri(etextno, mirror=None): +def _format_download_uri(etextno, mirror=None, prefer_ascii=False): """Returns the download location on the Project Gutenberg servers for a given text. + Use prefer_ascii to control whether you want to fetch plaintext us-ascii + file first (default old behavior) or if you prefer UTF-8 then 8-bits then + plaintext. + Raises: UnknownDownloadUri: If no download location can be found for the text. """ @@ -63,7 +67,14 @@ def _format_download_uri(etextno, mirror=None): uri_root = uri_root.strip().rstrip('/') _check_mirror_exists(uri_root) - extensions = ('.txt', '-8.txt', '-0.txt') + # Check https://www.gutenberg.org/files/ for details about available + # extensions ; + # - .txt is plaintext us-ascii + # - -8.txt is 8-bit plaintext, multiple encodings + # - -0.txt is UTF-8 + ascii_first = ('.txt', '-0.txt', '-8.txt') + utf8_first = ('-0.txt', '-8.txt', '.txt') + extensions = ascii_first if prefer_ascii else utf8_first for extension in extensions: path = _etextno_to_uri_subdirectory(etextno) uri = '{root}/{path}/{etextno}{extension}'.format( @@ -79,7 +90,7 @@ def _format_download_uri(etextno, mirror=None): .format(etextno, uri_root)) -def load_etext(etextno, refresh_cache=False, mirror=None): +def load_etext(etextno, refresh_cache=False, mirror=None, prefer_ascii=False): """Returns a unicode representation of the full body of a Project Gutenberg text. After making an initial remote call to Project Gutenberg's servers, the text is persisted locally. @@ -92,8 +103,18 @@ def load_etext(etextno, refresh_cache=False, mirror=None): remove(cached) if not os.path.exists(cached): makedirs(os.path.dirname(cached)) - download_uri = _format_download_uri(etextno, mirror) + download_uri = _format_download_uri(etextno, mirror, prefer_ascii) response = requests.get(download_uri) + # Ensure proper UTF-8 saving. There might be instances of ebooks or + # mirrors which advertise a broken encoding, and this will break + # downstream usages. For example, #55517 from aleph.gutenberg.org: + # + # from gutenberg.acquire import load_etext + # print(load_etext(55517, refresh_cache=True)[0:1000]) + # + # response.encoding will be 'ISO-8859-1' while the file is UTF-8 + if response.encoding != response.apparent_encoding: + response.encoding = response.apparent_encoding text = response.text with closing(gzip.open(cached, 'w')) as cache: cache.write(text.encode('utf-8')) @@ -115,12 +136,15 @@ def _main(): parser.add_argument('etextno', type=int) parser.add_argument('outfile', type=FileType('w')) parser.add_argument('--mirror', '-m', type=str) + parser.add_argument('--prefer-ascii', '-a', type=bool, default=False) args = parser.parse_args() mirror = args.mirror or os.environ.get('GUTENBERG_MIRROR') try: - text = load_etext(args.etextno, mirror=mirror) + text = load_etext(args.etextno, + mirror=mirror, + prefer_ascii=args.prefer_ascii) with reopen_encoded(args.outfile, 'w', 'utf8') as outfile: outfile.write(text) except Error as error: diff --git a/setup.py b/setup.py index 532e229..fb3c8a0 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ def requirements_for(version=None): setup( name='Gutenberg', - version='0.6.1', + version='0.7.0', author='Clemens Wolff', author_email='clemens.wolff+pypi@gmail.com', packages=find_packages(exclude=['tests']), diff --git a/tests/test_acquire.py b/tests/test_acquire.py index 8a62629..5e388b5 100644 --- a/tests/test_acquire.py +++ b/tests/test_acquire.py @@ -80,6 +80,81 @@ def test_unreachable_mirror(self): with self.assertRaises(UnknownDownloadUriException): text.load_etext(1) +class TestExtensionsLoadEtext(unittest.TestCase): + def setUp(self): + self._original_head = text.requests.head + self._original_check = text._check_mirror_exists + + def tearDown(self): + text.requests.head = self._original_head + text._check_mirror_exists = self._original_check + + def request_head_response(self, valid_files): + response = namedtuple('Response', 'ok') + + def head(*args, **kwargs): + req_file = args[0].split('/')[-1] + return response(req_file in valid_files) + text.requests.head = head + + def mirror_exist(*args, **kwargs): + return response(True) + text._check_mirror_exists = mirror_exist + + def test_extensions_order_utf8_only(self): + utf8_filename = '12345-0.txt' + self.request_head_response(valid_files=[utf8_filename]) + + extensions = text._format_download_uri(12345) + self.assertEqual(extensions.split('/')[-1], utf8_filename) + + extensions = text._format_download_uri(12345, prefer_ascii=False) + self.assertEqual(extensions.split('/')[-1], utf8_filename) + + def test_extensions_order_ascii_only(self): + ascii_filename = '12345.txt' + self.request_head_response(valid_files=[ascii_filename]) + + extensions = text._format_download_uri(12345) + self.assertEqual(extensions.split('/')[-1], ascii_filename) + + extensions = text._format_download_uri(12345, prefer_ascii=True) + self.assertEqual(extensions.split('/')[-1], ascii_filename) + + def test_extensions_order_utf8_first(self): + utf8_filename = '12345-0.txt' + all_files = ['12345.txt', '12345-8.txt', '12345-0.txt'] + self.request_head_response(valid_files=all_files) + + extensions = text._format_download_uri(12345) + self.assertEqual(extensions.split('/')[-1], utf8_filename) + + extensions = text._format_download_uri(12345, prefer_ascii=False) + self.assertEqual(extensions.split('/')[-1], utf8_filename) + + def test_extensions_order_ascii_first(self): + ascii_filename = '12345.txt' + all_files = ['12345-8.txt', '12345-0.txt', '12345.txt'] + self.request_head_response(valid_files=all_files) + + extensions = text._format_download_uri(12345) + self.assertNotEqual(extensions.split('/')[-1], ascii_filename) + + extensions = text._format_download_uri(12345, prefer_ascii=True) + self.assertEqual(extensions.split('/')[-1], ascii_filename) + + def test_extensions_order_eightbit_first(self): + eightbit_filename = '12345-8.txt' + ascii_filename = '12345.txt' + all_files = ['12345-8.txt', '12345.txt'] + self.request_head_response(valid_files=all_files) + + extensions = text._format_download_uri(12345) + self.assertEqual(extensions.split('/')[-1], eightbit_filename) + + extensions = text._format_download_uri(12345, prefer_ascii=True) + self.assertEqual(extensions.split('/')[-1], ascii_filename) + if __name__ == '__main__': unittest.main()