diff --git a/dev_requirements/requirements-coverage.in b/dev_requirements/requirements-coverage.in index d1e1a80..987838e 100644 --- a/dev_requirements/requirements-coverage.in +++ b/dev_requirements/requirements-coverage.in @@ -1,2 +1,3 @@ # specific requirements for the tox coverage env coverage +pytest_loguru diff --git a/dev_requirements/requirements-coverage.txt b/dev_requirements/requirements-coverage.txt index 7829865..b24cbfa 100644 --- a/dev_requirements/requirements-coverage.txt +++ b/dev_requirements/requirements-coverage.txt @@ -1,9 +1,27 @@ -# SHA1:6dafbcf610e9f81897b65ee9142715ab2e793f9e +# SHA1:5c99babe62754a9fe23771e877f18d99a3faa208 # # This file is autogenerated by pip-compile-multi # To update, run: # # pip-compile-multi # +colorama==0.4.6 + # via + # loguru + # pytest coverage==7.4.3 # via -r dev_requirements\requirements-coverage.in +iniconfig==2.0.0 + # via pytest +loguru==0.7.2 + # via pytest-loguru +packaging==24.0 + # via pytest +pluggy==1.4.0 + # via pytest +pytest==8.1.1 + # via pytest-loguru +pytest-loguru==0.3.0 + # via -r dev_requirements\requirements-coverage.in +win32-setctime==1.1.0 + # via loguru diff --git a/dev_requirements/requirements-tests.in b/dev_requirements/requirements-tests.in index 5497961..4dacd38 100644 --- a/dev_requirements/requirements-tests.in +++ b/dev_requirements/requirements-tests.in @@ -1,2 +1,3 @@ # specific requirements for the tox tests env pytest +pytest_loguru # makes loguru logs accessible to pytest diff --git a/dev_requirements/requirements-tests.txt b/dev_requirements/requirements-tests.txt index 2c7326b..3d4ba11 100644 --- a/dev_requirements/requirements-tests.txt +++ b/dev_requirements/requirements-tests.txt @@ -1,4 +1,4 @@ -# SHA1:0eaa389e1fdb3a1917c0f987514bd561be5718ee +# SHA1:c2ec5869b1beb7fc76c4dcc722605bc1512f62aa # # This file is autogenerated by pip-compile-multi # To update, run: @@ -6,12 +6,22 @@ # pip-compile-multi # colorama==0.4.6 - # via pytest + # via + # loguru + # pytest iniconfig==2.0.0 # via pytest +loguru==0.7.2 + # via pytest-loguru packaging==24.0 # via pytest pluggy==1.4.0 # via pytest pytest==8.1.1 + # via + # -r dev_requirements\requirements-tests.in + # pytest-loguru +pytest-loguru==0.3.0 # via -r dev_requirements\requirements-tests.in +win32-setctime==1.1.0 + # via loguru diff --git a/src/migmose/__main__.py b/src/migmose/__main__.py index 049c804..72885d3 100644 --- a/src/migmose/__main__.py +++ b/src/migmose/__main__.py @@ -2,19 +2,13 @@ contains CLI logic for migmose. """ -import json from pathlib import Path -from typing import Generator, Union import click -import docx # type: ignore[import] -from docx.document import Document # type: ignore[import] -from docx.oxml import CT_Tbl # type: ignore[import] -from docx.table import Table, _Cell # type: ignore[import] -from docx.text.paragraph import Paragraph # type: ignore[import] -from loguru import logger from maus.edifact import EdifactFormat +from migmose.parsing import find_file_to_format, parse_raw_nachrichtenstrukturzeile, preliminary_output_as_json + # add CLI logic @click.command() @@ -27,11 +21,11 @@ ) @click.option( "-mt", - "--message_type", + "--message_format", type=click.Choice(list(map(lambda x: x.name, EdifactFormat)), case_sensitive=False), # Taken from https://github.com/pallets/click/issues/605#issuecomment-889462570 - prompt="Please specify which message type to be parsed.", - help="Defines the set of message types to be parsed.", + prompt="Please specify which message format to be parsed.", + help="Defines the set of message formats to be parsed.", multiple=True, ) @click.option( @@ -41,85 +35,16 @@ prompt="Please enter the path to the directory which should contain the output files.", help="Set path to directory which contains the output files. If the directory does not exist, it will be created.", ) -def main(input_dir: Path, output_dir, message_type: list[EdifactFormat]) -> None: +def main(input_dir: Path, output_dir, message_format: list[EdifactFormat]) -> None: """ Main function. Uses CLI input. """ - dict_files = find_file_to_type(message_type, input_dir) - for m_type, file in dict_files.items(): + dict_files = find_file_to_format(message_format, input_dir) + for m_format, file in dict_files.items(): mig_table = parse_raw_nachrichtenstrukturzeile(file) for item in mig_table: print(item) - preliminary_output_as_json(mig_table, m_type, output_dir) - - -def find_file_to_type(message_types: list[EdifactFormat], input_dir: Path) -> dict[EdifactFormat, Path]: - """ - finds the file with the message type in the input directory - """ - file_dict = {} - for message_type in message_types: - list_of_all_files = [ - file for file in input_dir.iterdir() if message_type in file.name and file.suffix == ".docx" - ] - if len(list_of_all_files) == 1: - file_dict[message_type] = list_of_all_files[0] - elif len(list_of_all_files) > 1: - logger.warning(f"⚠️ There are several files for {message_type}.", fg="red") - else: - logger.warning(f"⚠️ No file found for {message_type}.", fg="red") - if file_dict: - return file_dict - logger.error("⚠️ No files found in the input directory.", fg="red") - raise click.Abort() - - -def preliminary_output_as_json(table: list[str], message_type: EdifactFormat, output_dir: Path) -> None: - """ - writes the preliminary output as json - """ - if not output_dir.exists(): - output_dir.mkdir(parents=True, exist_ok=True) - file_path = output_dir.joinpath(f"{message_type}_preliminary_output.json") - structured_json = {line: None for line in table} - with open(file_path, "w", encoding="utf-8") as json_file: - json.dump(structured_json, json_file, indent=4, encoding="utf-8") - logger.info(f"Created and wrote to {file_path}") - - -def get_paragraphs_up_to_diagram(parent: Union[Document, _Cell]) -> Generator[Union[Paragraph, Table], None, None]: - """Goes through paragraphs and tables""" - # pylint: disable=protected-access - if isinstance(parent, Document): - parent_elm = parent.element.body - elif isinstance(parent, _Cell): - parent_elm = parent._tc - else: - raise ValueError("Passed parent argument must be of type Document or _Cell") - - for child in parent_elm.iterchildren(): - if isinstance(child, CT_Tbl): - yield Table(child, parent) - - -def parse_raw_nachrichtenstrukturzeile(input_path: Path) -> list[str]: - """ - parses raw nachrichtenstrukturzeile from a table. returns list of raw lines - """ - # pylint: disable=protected-access - doc = docx.Document(input_path) - docx_objects = get_paragraphs_up_to_diagram(doc) - mig_tables = [] - nachrichtenstruktur_header = "Status\tMaxWdh\n\tZähler\tNr\tBez\tSta\tBDEW\tSta\tBDEW\tEbene\tInhalt" - for docx_object in docx_objects: - for ind, line in enumerate(docx_object._cells): - # marks the beginning of the complete nachrichtentruktur table - if line.text == nachrichtenstruktur_header: - mig_tables.extend([row.text for row in docx_object._cells[ind + 1 :]]) - break - # filter empty rows and headers - mig_tables = [row for row in mig_tables if row not in ("\n", nachrichtenstruktur_header)] - return mig_tables + preliminary_output_as_json(mig_table, m_format, output_dir) if __name__ == "__main__": diff --git a/src/migmose/parsing.py b/src/migmose/parsing.py new file mode 100644 index 0000000..c8ebb00 --- /dev/null +++ b/src/migmose/parsing.py @@ -0,0 +1,85 @@ +""" +contains functions for file handling and parsing. +""" + +import json +from pathlib import Path +from typing import Generator, Union + +import click +import docx # type: ignore[import] +from docx.document import Document # type: ignore[import] +from docx.oxml import CT_Tbl # type: ignore[import] +from docx.table import Table, _Cell # type: ignore[import] +from docx.text.paragraph import Paragraph # type: ignore[import] +from loguru import logger +from maus.edifact import EdifactFormat + + +def find_file_to_format(message_formats: list[EdifactFormat], input_dir: Path) -> dict[EdifactFormat, Path]: + """ + finds the file with the message type in the input directory + """ + file_dict = {} + for message_format in message_formats: + list_of_all_files = [ + file for file in input_dir.iterdir() if message_format in file.name and file.suffix == ".docx" + ] + if len(list_of_all_files) == 1: + file_dict[message_format] = list_of_all_files[0] + elif len(list_of_all_files) > 1: + logger.warning(f"⚠️ There are several files for {message_format}.", fg="red") + else: + logger.warning(f"⚠️ No file found for {message_format}.", fg="red") + if file_dict: + return file_dict + logger.error("❌ No files found in the input directory.", fg="red") + raise click.Abort() + + +def preliminary_output_as_json(table: list[str], message_format: EdifactFormat, output_dir: Path) -> None: + """ + Writes the preliminary output as json. + Serves only as a preliminary helper function until more precise class methods are implemented. + """ + output_dir.mkdir(parents=True, exist_ok=True) + file_path = output_dir.joinpath(f"{message_format}_preliminary_output.json") + structured_json = {line: None for line in table} + with open(file_path, "w", encoding="utf-8") as json_file: + json.dump(structured_json, json_file, indent=4) + logger.info(f"Created and wrote to {file_path}") + + +def get_paragraphs_up_to_diagram(parent: Union[Document, _Cell]) -> Generator[Union[Paragraph, Table], None, None]: + """Goes through paragraphs and tables""" + # pylint: disable=protected-access + if isinstance(parent, Document): + parent_elm = parent.element.body + elif isinstance(parent, _Cell): + parent_elm = parent._tc + else: + raise ValueError("Passed parent argument must be of type Document or _Cell") + + for child in parent_elm.iterchildren(): + if isinstance(child, CT_Tbl): + yield Table(child, parent) + + +def parse_raw_nachrichtenstrukturzeile(input_path: Path) -> list[str]: + """ + parses raw nachrichtenstrukturzeile from a table. returns list of raw lines + """ + # pylint: disable=protected-access + doc = docx.Document(input_path) + docx_objects = get_paragraphs_up_to_diagram(doc) + mig_tables = [] + nachrichtenstruktur_header = "Status\tMaxWdh\n\tZähler\tNr\tBez\tSta\tBDEW\tSta\tBDEW\tEbene\tInhalt" + for docx_object in docx_objects: + for ind, line in enumerate(docx_object._cells): + # marks the beginning of the complete nachrichtentruktur table + if line.text == nachrichtenstruktur_header: + mig_tables.extend([row.text for row in docx_object._cells[ind + 1 :]]) + break + # filter empty rows and headers + mig_tables = [row for row in mig_tables if row not in ("\n", nachrichtenstruktur_header)] + return mig_tables diff --git a/tox.ini b/tox.ini index 2826330..5c5ca16 100644 --- a/tox.ini +++ b/tox.ini @@ -55,7 +55,6 @@ commands = [testenv:coverage] # the coverage environment is called by the Github Action that runs the coverage measurement -changedir = unittests deps = {[testenv:tests]deps} -r dev_requirements/requirements-coverage.txt diff --git a/unittests/test_parsing.py b/unittests/test_parsing.py new file mode 100644 index 0000000..7397c7e --- /dev/null +++ b/unittests/test_parsing.py @@ -0,0 +1,64 @@ +""" +Test parsing routines. +""" + +import json +import logging +from pathlib import Path + +from maus.edifact import EdifactFormat +from pytest_loguru.plugin import caplog # type: ignore[import] # pylint: disable=unused-import + +from migmose.parsing import find_file_to_format, parse_raw_nachrichtenstrukturzeile, preliminary_output_as_json + + +class TestParsing: + """ + Test class for parsing functions. + """ + + def test_find_file_to_format(self): + """ + Test find_file_to_format function. Tests whether the MIG to edifact format ORDCHG is found in the test folder. + """ + message_format = [EdifactFormat.ORDCHG] + input_dir = Path("unittests/test_data/") + file_dict = find_file_to_format(message_format, input_dir) + assert file_dict[EdifactFormat.ORDCHG] == input_dir / Path("ORDCHG_MIG_1_1_info_20230331_v2.docx") + + def test_find_only_one_file(self, caplog): + """ + Tests to find multiple formats when one is not present. + """ + message_formats = [EdifactFormat.ORDCHG, EdifactFormat.ORDRSP] + input_dir = Path("unittests/test_data/") + with caplog.at_level(logging.WARNING): + file_dict = find_file_to_format(message_formats, input_dir) + assert f"No file found for {EdifactFormat.ORDRSP}." in caplog.text + assert file_dict[EdifactFormat.ORDCHG] == input_dir / Path("ORDCHG_MIG_1_1_info_20230331_v2.docx") + + def test_parse_raw_nachrichtenstrukturzeile(self): + """ + Test to parse the raw nachrichtenstrukturzeile from a docx file. + """ + input_file = Path("unittests/test_data/ORDCHG_MIG_1_1_info_20230331_v2.docx") + mig_table = parse_raw_nachrichtenstrukturzeile(input_file) + assert len(mig_table) == 18 + assert "Nachrichten-Kopfsegment" in mig_table[0] + assert "Nachrichten-Endesegment" in mig_table[-1] + + def test_preliminary_output_as_json(self, tmp_path): + """Tests the preliminary output as json function. + Asserts that the outputfile exists and has the correct content.""" + table = ["line1", "line2", "line3"] + message_format = EdifactFormat.ORDCHG + output_dir = tmp_path / Path("output") + + preliminary_output_as_json(table, message_format, output_dir) + + file_path = output_dir / f"{message_format}_preliminary_output.json" + assert file_path.exists() + + with open(file_path, "r", encoding="utf-8") as json_file: + content = json.load(json_file) + assert content == {"line1": None, "line2": None, "line3": None}