-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathload_data.py
145 lines (125 loc) · 5.3 KB
/
load_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Python script to load data
# Example: load_data.py court raw
# (uploads court data csvs to schema raw in the database and at
# the file path defined in constants.py)
# Import neccesary libraries
import numpy as np
import pandas as pd
import sqlalchemy
import ohio.ext.pandas
import pathlib
import argparse
import logging
from ast import Str
# Imports from our project
import src.utils.constants as constants
from src.utils.file_processing import tidy_name
# Set up logging
logging.basicConfig(filename="data_loading.log", filemode="w", level=logging.INFO)
# Take in arguments for data type and schema to upload
# import sys
# print(sys.argv)
parser = argparse.ArgumentParser(description="Load one type of data to one schema.")
parser.add_argument(
"data", type=str, default="court", help="name of the data, either ccl or court"
)
parser.add_argument("schema", type=str, default="tmp_raw", help="name of the schema")
args = parser.parse_args()
logging.info("args.data: %s", args.data)
logging.info("args.schema: %s", args.schema)
# Check if schema exists and create otherwise
# Start engine connection
engine = sqlalchemy.create_engine(constants.ENGINE_PATH)
inspector = sqlalchemy.inspect(engine)
if args.schema not in inspector.get_schema_names():
with engine.connect() as conn:
with conn.begin():
conn.execute('Set role "kcmo-mc-role";')
conn.execute("CREATE SCHEMA " + args.schema + ";")
# IF CCL DATA
if args.data == "ccl":
# Grab appropriate paths
ccl_paths = constants.CCL_PATHS
xls = [pd.ExcelFile(ccl_path) for ccl_path in ccl_paths]
# Inspect sheet names, check match
sheet_names = xls[0].sheet_names
logging.info(["There are ", len(sheet_names), " sheets in the file"])
logging.info("Sheet_names: %s", sheet_names)
for xl in xls:
if xls[0].sheet_names != xl.sheet_names:
logging.warning("Excel files have different sheets")
# For each sheet name, combine components across the excel files
for sheet_name in sheet_names:
logging.info("Now on sheet named: %s", sheet_name)
# For each ccl_path, get the corresponding dataframe
df_list = [xl.parse(sheet_name) for xl in xls]
# the following is error-handeling module for xls that fail to upload
try:
df = pd.concat(df_list).reset_index()
except Exception as e:
logging.error("ERROR IN DATAFRAME CONCAT: %s", e)
# Update the column names
df.columns = [tidy_name(c) for c in df.columns.values.tolist()]
# Create clean sheet name
clean_sheet_name = tidy_name(sheet_name)
# Display cleaned sheet name and dataframe
logging.info("Cleaned sheet name: %s", clean_sheet_name)
# Upload combined dataframe to database as sql table using Ohio
with engine.connect() as conn:
with conn.begin():
conn.execute('Set role "kcmo-mc-role";')
df.pg_copy_to(
schema=args.schema,
name=clean_sheet_name,
con=conn,
index=False,
if_exists="replace",
)
# IF COURTS DATA
elif args.data == "court":
def get_filename(path):
# Function to create a list with names of all files in a folder
# list to store files
files = []
# Grab the full filepath for every csv in the directory and its subdirectories
rootdir = pathlib.Path(path)
files = [f for f in rootdir.glob("**/*") if f.is_file() and f.suffix == ".csv"]
return files
def mass_upload(files):
# Function to upload court files in bulk
for file in files:
logging.info("Uploading %s", file)
try:
clean_csv_name = tidy_name(file.name)
try:
# Make everything a string so that all of the column types
# are the same between files of the same table
df = pd.read_csv(file, dtype=str)
except Exception as e:
logging.error("Failed on %s", clean_csv_name)
logging.error("Error message %s", e)
logging.info("Uploading file skipping lines %s", file)
df = pd.read_csv(file, on_bad_lines="skip", dtype=str)
df.columns = [tidy_name(c) for c in df.columns.values.tolist()]
# Copy the dataframe to postgres (to the right schema and tablename)
with engine.connect() as conn:
with conn.begin():
conn.execute('Set role "kcmo-mc-role";')
df.pg_copy_to(
schema=args.schema,
name=f"{clean_csv_name}",
con=conn,
index=False,
if_exists="replace",
)
except Exception as e:
logging.error("Failed completely to upload file: %s", file)
logging.error("Error message: %s", e)
# Get path from constants.py
# get_path()
path = constants.COURT_PATH
files = get_filename(path)
mass_upload(files)
else:
print("Invalid data argument: must be either 'ccl' or 'court'")
print("Use -h flag to get info on argument options.")