-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathPandasUtils.py
148 lines (124 loc) · 4.4 KB
/
PandasUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# coding: utf-8
#!/usr/bin/env python
import pandas as pd
import json
from pandas.io.json import json_normalize
import unicodedata as ucd
from openpyxl import Workbook
class PandasUtils:
@staticmethod
def getWkbk(fn):
wkbk = pd.ExcelFile(fn)
return wkbk
@staticmethod
def removeCols(df, list_of_cols_to_remove):
'''removes cols inplace'''
df_col = list(df.columns)
#check to make sure that column exists in the dataframe
list_of_cols_to_remove = [col for col in list_of_cols_to_remove if col in df_col]
return df.drop(list_of_cols_to_remove, axis=1)
#return df.drop(df[list_of_cols_to_remove],inplace=True,axis=1)
@staticmethod
def castDateFieldsAsString(df, list_of_date_cols, dt_format):
for col in list_of_date_cols:
df[col] = df[col].dt.strftime(dt_format)
return df
@staticmethod
def loadCsv(fullpath):
df = None
try:
df = pd.read_csv(fullpath)
except Exception as e:
print (str(e))
return df
@staticmethod
def fillNaWithBlank(df):
return df.fillna("")
@staticmethod
def makeDfFromJson(json_obj):
df = json_normalize(json_obj)
return df
@staticmethod
def convertDfToDictrows(df):
return df.to_dict(orient='records')
@staticmethod
def mapFieldNames(df, field_mapping_dict):
return df.rename(columns=field_mapping_dict)
@staticmethod
def renameCols(df, colMappingDict):
df = df.rename(columns=colMappingDict)
return df
@staticmethod
def groupbyCountStar(df, group_by_list):
return df.groupby(group_by_list).size().reset_index(name='count')
@staticmethod
def colToLower(df, field_name):
'''strips off white space and converts the col to lower'''
df[field_name] = df[field_name].astype(str)
df[field_name] = df[field_name].str.lower()
df[field_name] = df[field_name].map(str.strip)
return df
@staticmethod
def makeLookupDictOnTwo(df, key_col, val_col):
return dict(zip(df[key_col], df[val_col]))
@staticmethod
def dfToHTMLTable(df, headers):
def html_tags(tag, item):
backtag = "</" + tag[1:]
if (item is None):
return tag +backtag
return tag + item +backtag
df_list = PandasUtils.convertDfToDictrows(df)
headers = headers.split(', ')
header = [ html_tags('<th>', vals) for vals in headers ]
table_rows = ['<table style="border-collapse:collapse">', '<tr style="border: 1px solid black;">'] + header + ['</tr>']
if (not(df_list is None)):
print (len(df_list))
for item in df_list:
item_vals = []
for col in headers:
item_vals.append(item[col])
row = [ html_tags('<td style="border: 1px solid black; padding:2px">', vals) for vals in item_vals]
row = ['<tr>'] + row
row.append('</tr>')
table_rows = table_rows + row
table_rows.append("</table>")
return " ".join(table_rows)
class PostGresPandas:
@staticmethod
def qryToDF (conn_alq, qry ):
'''takes a qry and returns a dataset'''
df = pd.read_sql_query(qry, con=conn_alq)
return df
class PandasToExcel:
@staticmethod
def writeWkBks(wkbk_fn, df_shts, showIndex=False):
'''takes a list of dict items and turns them into sheets
dict items are structured like this:
{'shtName': 'mysheet', 'df': df}
'''
writer = pd.ExcelWriter(wkbk_fn, engine='xlsxwriter', options={'remove_timezone': True})
workbook = writer.book
cell_format = workbook.add_format({'bold': True, 'font_size': '35', 'font_name': 'Arial'})
#shtFormat = workbook.add_format({'bold': True, 'font_color': 'red'})
for sheetItem in df_shts:
sheetItem['df'].to_excel( writer, index=showIndex, sheet_name=sheetItem['sht_name'], )
worksheet = writer.sheets[sheetItem['sht_name'] ] # pull worksheet object
worksheet.set_zoom(90)
for idx, col in enumerate(sheetItem['df']): # loop through all columns
series = sheetItem['df'][col]
try:
max_len = max((
series.astype(str).map(len).max(), # len of largest item
len(str(series.name)) # len of column name/header
)) + 1 # adding a little extra space
if(max_len == 0 or math.isnan(max_len)):
len(str(series.name))
except:
max_len = 15
worksheet.set_column(idx, idx, max_len) # set column width
worksheet.set_row(0,None, cell_format)
writer.save()
return True
if __name__ == "__main__":
main()