Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move parsing functions #13

Merged
merged 32 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
93696df
📍pin virtualenv version in pyproject.toml
DeltaDaniel Feb 27, 2024
8babf02
init datamodell
DeltaDaniel Feb 27, 2024
d2dd314
fix minor issue
DeltaDaniel Feb 27, 2024
b633c2b
spell_check
DeltaDaniel Feb 27, 2024
b18cc4a
updated pre-commit hooks
DeltaDaniel Feb 27, 2024
c35b4d9
black
DeltaDaniel Feb 27, 2024
a3b95f0
add logger
DeltaDaniel Feb 27, 2024
7004055
read line by line
DeltaDaniel Feb 28, 2024
b7d7eb4
Update pyproject.toml
DeltaDaniel Feb 28, 2024
848b511
refined line parser
DeltaDaniel Feb 28, 2024
0b446b7
Merge remote-tracking branch 'origin/main' into DDB/add_logger
DeltaDaniel Mar 4, 2024
fc63826
Merge remote-tracking branch 'origin/main' into DDB/add_logger
DeltaDaniel Mar 4, 2024
f977e95
changed logger to loguru
DeltaDaniel Mar 4, 2024
76cd270
added examples to some docstrings
DeltaDaniel Mar 4, 2024
09e83f0
Merge branch 'DDB/add_logger' into DDB/read_docx
DeltaDaniel Mar 4, 2024
f54f4ac
removed unused code
DeltaDaniel Mar 4, 2024
a7cb35f
added CLI and input/output
DeltaDaniel Mar 4, 2024
5178c4f
added json output
DeltaDaniel Mar 4, 2024
171c193
fixed linting, testing, etc, issues :-)
DeltaDaniel Mar 5, 2024
c8d75af
Merge remote-tracking branch 'origin/main' into DDB/add_FH_CLI
DeltaDaniel Mar 5, 2024
fb96d60
Update pyproject.toml
DeltaDaniel Mar 6, 2024
8e44a9d
Update src/migmose/__main__.py
DeltaDaniel Mar 6, 2024
a97ca36
message_type from maus.edifact.EdifactFormat
DeltaDaniel Mar 6, 2024
9042250
moved parsing functions, added test for find_file_to_type function
DeltaDaniel Mar 6, 2024
406d29e
added tests for parsing module
DeltaDaniel Mar 6, 2024
0f461fe
Merge branch 'main' into DDB/move_parsing_functions
DeltaDaniel Mar 6, 2024
5df6746
black
DeltaDaniel Mar 6, 2024
109bcb4
renamed NachrichtenTYPE -> NachrichtenFORMAT
DeltaDaniel Mar 12, 2024
4bbdc79
refined documentation
DeltaDaniel Mar 12, 2024
62978f0
simplified preliminary_output_as_json function
DeltaDaniel Mar 12, 2024
08d97ed
Merge remote-tracking branch 'origin/main' into DDB/move_parsing_func…
DeltaDaniel Mar 12, 2024
6e6c646
after merge requirements-compile
DeltaDaniel Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev_requirements/requirements-coverage.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# specific requirements for the tox coverage env
coverage
pytest_loguru
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh nice, was ist es, was tut es?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 changes: 19 additions & 1 deletion dev_requirements/requirements-coverage.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
# SHA1:6dafbcf610e9f81897b65ee9142715ab2e793f9e
# SHA1:5c99babe62754a9fe23771e877f18d99a3faa208
#
# This file is autogenerated by pip-compile-multi
# To update, run:
#
# pip-compile-multi
#
colorama==0.4.6
# via
# loguru
# pytest
coverage==7.4.3
# via -r dev_requirements\requirements-coverage.in
iniconfig==2.0.0
# via pytest
loguru==0.7.2
# via pytest-loguru
packaging==24.0
# via pytest
pluggy==1.4.0
# via pytest
pytest==8.1.1
# via pytest-loguru
pytest-loguru==0.3.0
# via -r dev_requirements\requirements-coverage.in
win32-setctime==1.1.0
# via loguru
1 change: 1 addition & 0 deletions dev_requirements/requirements-tests.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# specific requirements for the tox tests env
pytest
pytest_loguru # makes loguru logs accessible to pytest
14 changes: 12 additions & 2 deletions dev_requirements/requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
# SHA1:0eaa389e1fdb3a1917c0f987514bd561be5718ee
# SHA1:c2ec5869b1beb7fc76c4dcc722605bc1512f62aa
#
# This file is autogenerated by pip-compile-multi
# To update, run:
#
# pip-compile-multi
#
colorama==0.4.6
# via pytest
# via
# loguru
# pytest
iniconfig==2.0.0
# via pytest
loguru==0.7.2
# via pytest-loguru
packaging==24.0
# via pytest
pluggy==1.4.0
# via pytest
pytest==8.1.1
# via
# -r dev_requirements\requirements-tests.in
# pytest-loguru
pytest-loguru==0.3.0
# via -r dev_requirements\requirements-tests.in
win32-setctime==1.1.0
# via loguru
93 changes: 9 additions & 84 deletions src/migmose/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,13 @@
contains CLI logic for migmose.
"""

import json
from pathlib import Path
from typing import Generator, Union

import click
import docx # type: ignore[import]
from docx.document import Document # type: ignore[import]
from docx.oxml import CT_Tbl # type: ignore[import]
from docx.table import Table, _Cell # type: ignore[import]
from docx.text.paragraph import Paragraph # type: ignore[import]
from loguru import logger
from maus.edifact import EdifactFormat

from migmose.parsing import find_file_to_format, parse_raw_nachrichtenstrukturzeile, preliminary_output_as_json


# add CLI logic
@click.command()
Expand All @@ -27,11 +21,11 @@
)
@click.option(
"-mt",
"--message_type",
"--message_format",
type=click.Choice(list(map(lambda x: x.name, EdifactFormat)), case_sensitive=False),
# Taken from https://github.com/pallets/click/issues/605#issuecomment-889462570
prompt="Please specify which message type to be parsed.",
help="Defines the set of message types to be parsed.",
prompt="Please specify which message format to be parsed.",
help="Defines the set of message formats to be parsed.",
multiple=True,
)
@click.option(
Expand All @@ -41,85 +35,16 @@
prompt="Please enter the path to the directory which should contain the output files.",
help="Set path to directory which contains the output files. If the directory does not exist, it will be created.",
)
def main(input_dir: Path, output_dir, message_type: list[EdifactFormat]) -> None:
def main(input_dir: Path, output_dir, message_format: list[EdifactFormat]) -> None:
"""
Main function. Uses CLI input.
"""
dict_files = find_file_to_type(message_type, input_dir)
for m_type, file in dict_files.items():
dict_files = find_file_to_format(message_format, input_dir)
for m_format, file in dict_files.items():
mig_table = parse_raw_nachrichtenstrukturzeile(file)
for item in mig_table:
print(item)
preliminary_output_as_json(mig_table, m_type, output_dir)


def find_file_to_type(message_types: list[EdifactFormat], input_dir: Path) -> dict[EdifactFormat, Path]:
"""
finds the file with the message type in the input directory
"""
file_dict = {}
for message_type in message_types:
list_of_all_files = [
file for file in input_dir.iterdir() if message_type in file.name and file.suffix == ".docx"
]
if len(list_of_all_files) == 1:
file_dict[message_type] = list_of_all_files[0]
elif len(list_of_all_files) > 1:
logger.warning(f"⚠️ There are several files for {message_type}.", fg="red")
else:
logger.warning(f"⚠️ No file found for {message_type}.", fg="red")
if file_dict:
return file_dict
logger.error("⚠️ No files found in the input directory.", fg="red")
raise click.Abort()


def preliminary_output_as_json(table: list[str], message_type: EdifactFormat, output_dir: Path) -> None:
"""
writes the preliminary output as json
"""
if not output_dir.exists():
output_dir.mkdir(parents=True, exist_ok=True)
file_path = output_dir.joinpath(f"{message_type}_preliminary_output.json")
structured_json = {line: None for line in table}
with open(file_path, "w", encoding="utf-8") as json_file:
json.dump(structured_json, json_file, indent=4, encoding="utf-8")
logger.info(f"Created and wrote to {file_path}")


def get_paragraphs_up_to_diagram(parent: Union[Document, _Cell]) -> Generator[Union[Paragraph, Table], None, None]:
"""Goes through paragraphs and tables"""
# pylint: disable=protected-access
if isinstance(parent, Document):
parent_elm = parent.element.body
elif isinstance(parent, _Cell):
parent_elm = parent._tc
else:
raise ValueError("Passed parent argument must be of type Document or _Cell")

for child in parent_elm.iterchildren():
if isinstance(child, CT_Tbl):
yield Table(child, parent)


def parse_raw_nachrichtenstrukturzeile(input_path: Path) -> list[str]:
"""
parses raw nachrichtenstrukturzeile from a table. returns list of raw lines
"""
# pylint: disable=protected-access
doc = docx.Document(input_path)
docx_objects = get_paragraphs_up_to_diagram(doc)
mig_tables = []
nachrichtenstruktur_header = "Status\tMaxWdh\n\tZähler\tNr\tBez\tSta\tBDEW\tSta\tBDEW\tEbene\tInhalt"
for docx_object in docx_objects:
for ind, line in enumerate(docx_object._cells):
# marks the beginning of the complete nachrichtentruktur table
if line.text == nachrichtenstruktur_header:
mig_tables.extend([row.text for row in docx_object._cells[ind + 1 :]])
break
# filter empty rows and headers
mig_tables = [row for row in mig_tables if row not in ("\n", nachrichtenstruktur_header)]
return mig_tables
preliminary_output_as_json(mig_table, m_format, output_dir)


if __name__ == "__main__":
Expand Down
85 changes: 85 additions & 0 deletions src/migmose/parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
contains functions for file handling and parsing.
"""

import json
from pathlib import Path
from typing import Generator, Union

import click
import docx # type: ignore[import]
from docx.document import Document # type: ignore[import]
from docx.oxml import CT_Tbl # type: ignore[import]
from docx.table import Table, _Cell # type: ignore[import]
from docx.text.paragraph import Paragraph # type: ignore[import]
from loguru import logger
from maus.edifact import EdifactFormat


def find_file_to_format(message_formats: list[EdifactFormat], input_dir: Path) -> dict[EdifactFormat, Path]:
"""
finds the file with the message type in the input directory
"""
file_dict = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beschwert sich mypy hier nicht über den fehlenden typehint ^^?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finde ich gerade selber komisch, aber nein 🤔

for message_format in message_formats:
list_of_all_files = [
file for file in input_dir.iterdir() if message_format in file.name and file.suffix == ".docx"
]
if len(list_of_all_files) == 1:
file_dict[message_format] = list_of_all_files[0]
elif len(list_of_all_files) > 1:
logger.warning(f"⚠️ There are several files for {message_format}.", fg="red")
else:
logger.warning(f"⚠️ No file found for {message_format}.", fg="red")
if file_dict:
return file_dict
logger.error("❌ No files found in the input directory.", fg="red")
raise click.Abort()


def preliminary_output_as_json(table: list[str], message_format: EdifactFormat, output_dir: Path) -> None:
"""
Writes the preliminary output as json.
Serves only as a preliminary helper function until more precise class methods are implemented.
"""
output_dir.mkdir(parents=True, exist_ok=True)
file_path = output_dir.joinpath(f"{message_format}_preliminary_output.json")
structured_json = {line: None for line in table}
with open(file_path, "w", encoding="utf-8") as json_file:
json.dump(structured_json, json_file, indent=4)
logger.info(f"Created and wrote to {file_path}")


def get_paragraphs_up_to_diagram(parent: Union[Document, _Cell]) -> Generator[Union[Paragraph, Table], None, None]:
"""Goes through paragraphs and tables"""
# pylint: disable=protected-access
if isinstance(parent, Document):
parent_elm = parent.element.body
elif isinstance(parent, _Cell):
parent_elm = parent._tc
else:
raise ValueError("Passed parent argument must be of type Document or _Cell")

for child in parent_elm.iterchildren():
if isinstance(child, CT_Tbl):
yield Table(child, parent)


def parse_raw_nachrichtenstrukturzeile(input_path: Path) -> list[str]:
"""
parses raw nachrichtenstrukturzeile from a table. returns list of raw lines
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
parses raw nachrichtenstrukturzeile from a table. returns list of raw lines
Parses raw nachrichtenstrukturzeile from a table. Returns list of raw lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
# pylint: disable=protected-access
doc = docx.Document(input_path)
docx_objects = get_paragraphs_up_to_diagram(doc)
mig_tables = []
nachrichtenstruktur_header = "Status\tMaxWdh\n\tZähler\tNr\tBez\tSta\tBDEW\tSta\tBDEW\tEbene\tInhalt"
for docx_object in docx_objects:
for ind, line in enumerate(docx_object._cells):
# marks the beginning of the complete nachrichtentruktur table
if line.text == nachrichtenstruktur_header:
mig_tables.extend([row.text for row in docx_object._cells[ind + 1 :]])
break
# filter empty rows and headers
mig_tables = [row for row in mig_tables if row not in ("\n", nachrichtenstruktur_header)]
return mig_tables
1 change: 0 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ commands =

[testenv:coverage]
# the coverage environment is called by the Github Action that runs the coverage measurement
changedir = unittests
deps =
{[testenv:tests]deps}
-r dev_requirements/requirements-coverage.txt
Expand Down
64 changes: 64 additions & 0 deletions unittests/test_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Test parsing routines.
"""

import json
import logging
from pathlib import Path

from maus.edifact import EdifactFormat
from pytest_loguru.plugin import caplog # type: ignore[import] # pylint: disable=unused-import

from migmose.parsing import find_file_to_format, parse_raw_nachrichtenstrukturzeile, preliminary_output_as_json


class TestParsing:
"""
Test class for parsing functions.
"""

def test_find_file_to_format(self):
"""
Test find_file_to_format function. Tests whether the MIG to edifact format ORDCHG is found in the test folder.
"""
message_format = [EdifactFormat.ORDCHG]
input_dir = Path("unittests/test_data/")
file_dict = find_file_to_format(message_format, input_dir)
assert file_dict[EdifactFormat.ORDCHG] == input_dir / Path("ORDCHG_MIG_1_1_info_20230331_v2.docx")

def test_find_only_one_file(self, caplog):
"""
Tests to find multiple formats when one is not present.
"""
message_formats = [EdifactFormat.ORDCHG, EdifactFormat.ORDRSP]
input_dir = Path("unittests/test_data/")
with caplog.at_level(logging.WARNING):
file_dict = find_file_to_format(message_formats, input_dir)
assert f"No file found for {EdifactFormat.ORDRSP}." in caplog.text
assert file_dict[EdifactFormat.ORDCHG] == input_dir / Path("ORDCHG_MIG_1_1_info_20230331_v2.docx")

def test_parse_raw_nachrichtenstrukturzeile(self):
"""
Test to parse the raw nachrichtenstrukturzeile from a docx file.
"""
input_file = Path("unittests/test_data/ORDCHG_MIG_1_1_info_20230331_v2.docx")
mig_table = parse_raw_nachrichtenstrukturzeile(input_file)
assert len(mig_table) == 18
assert "Nachrichten-Kopfsegment" in mig_table[0]
assert "Nachrichten-Endesegment" in mig_table[-1]

def test_preliminary_output_as_json(self, tmp_path):
"""Tests the preliminary output as json function.
Asserts that the outputfile exists and has the correct content."""
table = ["line1", "line2", "line3"]
message_format = EdifactFormat.ORDCHG
output_dir = tmp_path / Path("output")

preliminary_output_as_json(table, message_format, output_dir)

file_path = output_dir / f"{message_format}_preliminary_output.json"
assert file_path.exists()

with open(file_path, "r", encoding="utf-8") as json_file:
content = json.load(json_file)
assert content == {"line1": None, "line2": None, "line3": None}