-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathetl_util_users.py
executable file
·97 lines (80 loc) · 3.67 KB
/
etl_util_users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import json
from contextlib import contextmanager
import psycopg2
from psycopg2.extras import Json, DictCursor
from psycopg2.pool import ThreadedConnectionPool
from config import deploy_as_docker, db_users_table, users_json_data, logger, db_port, get_db_credential
from config import dbConnection
connectionpool = ThreadedConnectionPool (minconn=1, maxconn=10, dsn=dbConnection)
@contextmanager
def getconn():
con = connectionpool.getconn ()
try:
yield con
finally:
connectionpool.putconn (con)
class users_crud:
def load_users_data(self):
with open (users_json_data) as f_restaurant_with_menu:
user_ls = json.load (f_restaurant_with_menu)
with getconn () as conn_config:
cur = conn_config.cursor (cursor_factory=DictCursor)
try:
cur.execute ("DROP TABLE IF EXISTS " + db_users_table + " ;")
cur.execute ('CREATE TABLE ' + db_users_table + ' (id integer PRIMARY KEY,'
'name varchar (550) NOT NULL,'
'cashBalance float8 NOT NULL,'
'purchaseHistory JSONB);'
)
insert_statement = 'insert into ' + db_users_table + ' (id, name, cashBalance, purchaseHistory) values (%s, %s, %s, to_jsonb(%s))'
for rec in user_ls:
# print(rec)
dish_ls = []
if not rec['purchaseHistory']:
dish_ls.append (Json ({}))
else:
for dish in rec['purchaseHistory']:
dish_ls.append (Json ({'dishName': dish['dishName'], 'restaurantName': dish['restaurantName'], 'transactionAmount': dish['transactionAmount'], 'transactionDate': dish['transactionDate']}))
cur.execute (insert_statement, [int (rec['id']), rec['name'], float (rec['cashBalance']), dish_ls])
# break
conn_config.commit ()
cur.close ()
# conn.close ()
except (Exception, psycopg2.DatabaseError) as error:
try:
cur.close ()
conn_config.close ()
except:
pass
logger.error ('ERROR 403: Transaction Error.' + error)
def get_dishes(self, conn_config, cnt=-1):
""" query data from table """
cur = conn_config.cursor ()
try:
cur.execute ("SELECT id, name, cashBalance, purchaseHistory FROM " + db_users_table)
users_records = cur.fetchone ()
item_cnt = 0
res = []
while users_records is not None:
id, name, cashBalance, purchaseHistory = users_records[0], users_records[1], users_records[2], users_records[3]
res.append ([id, name, cashBalance, purchaseHistory])
if cnt != -1:
item_cnt += 1
if cnt == item_cnt:
break
users_records = cur.fetchone ()
cur.close ()
return res
except (Exception, psycopg2.DatabaseError) as error:
try:
cur.close ()
conn_config.close ()
except:
pass
logger.error ('ERROR 403: Transaction Error.' + error)
if __name__ == "__main__":
users_crud ().load_users_data ()
with getconn () as conn_config:
for r in users_crud ().get_dishes (conn_config=conn_config, cnt=1):
print (r)
conn_config.close ()