Skip to content

Commit 2ef0440

Browse files
authored
Update PyLagoon (#4)
Updated PyLagoon's query methods to allow them to support cross-source queries and added a PyLagoon example which corresponds to the annoucement blog post. Also fixed the image name in the docker-compose file and some other references in the documentation.
1 parent 9fc0e33 commit 2ef0440

File tree

11 files changed

+528
-187
lines changed

11 files changed

+528
-187
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,6 @@ RubyLagoon/vendor
5050
.vscode
5151

5252
clients/RLagoon/_libs
53+
54+
# Extra notebook files
55+
.ipynb_checkpoints

clients/PyLagoon/PyLagoon/datalake.py renamed to clients/PyLagoon/PyLagoon/lagoon.py

+75-44
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import pandas as pd
1616
import json
1717
import os.path
18+
from typing import List
1819

1920
from PyLagoon.config import LagoonConfig
2021
from PyLagoon.postgresql import PGMeta, build_sql_query
@@ -36,9 +37,10 @@ def __init__(self, config, host=None, port=None):
3637
self.__cookies = self.__connect()
3738

3839
def __connect(self):
39-
reply = req.post(self.conn_str + "user/login",
40-
json={"user": self.__cfg.USER,
41-
"pass": self.__cfg.PASSWORD})
40+
reply = req.post(
41+
self.conn_str + "user/login",
42+
json={"user": self.__cfg.USER, "pass": self.__cfg.PASSWORD},
43+
)
4244
if reply.ok:
4345
return reply.cookies
4446
else:
@@ -61,9 +63,7 @@ def sources(self, ontoClass=None, tags=None, columns=None, **kwargs):
6163
kwargs["tag"] = tags
6264
if columns:
6365
kwargs["columns"] = columns
64-
reply = req.get(self.conn_str + "sources",
65-
params=kwargs,
66-
cookies=self.__cookies)
66+
reply = req.get(self.conn_str + "sources", params=kwargs, cookies=self.__cookies)
6767
return [Source(j) for j in reply.json()]
6868

6969
def ingest(self, file_path, name, ontoClass=None, tags=None, **kwargs):
@@ -79,12 +79,14 @@ def ingest(self, file_path, name, ontoClass=None, tags=None, **kwargs):
7979
kwargs["tag"] = tags
8080
kwargs["name"] = name
8181
kwargs["input"] = os.path.split(file_path)[1]
82-
# So the server can guess the fileType
83-
reply = req.post(self.conn_str + "sources",
84-
data=open(file_path, "rb"),
85-
params=kwargs,
86-
stream=True,
87-
cookies=self.__cookies)
82+
# So the server can guess the fileType
83+
reply = req.post(
84+
self.conn_str + "sources",
85+
data=open(file_path, "rb"),
86+
params=kwargs,
87+
stream=True,
88+
cookies=self.__cookies,
89+
)
8890
report = (json.loads(line.decode("utf-8")) for line in reply.raw)
8991
stack = []
9092
last = None
@@ -116,38 +118,67 @@ def users(self):
116118
reply = req.get(self.conn_str + "users")
117119
return reply.json()
118120

119-
def tbl(self, source=None, query=None):
120-
"""tbl() in RLagoon
121-
122-
Give one of source or query.
123-
124-
source is a Source, query is an sqlalchemy.orm.query.Query created
125-
through use of PyLagoon.postgresql.PGMeta and the sqlalchemy EDSL.
121+
def download_source(self, source):
122+
"""Constructs a DataFrame containing an entire source
126123
"""
124+
is_json = any(c["type"][0] == "JSON" for c in source.columns)
125+
if is_json:
126+
# We need a JSON document in that case
127+
# the sql endpoint will return one
128+
meta = PGMeta([source])
129+
table = meta[source]
130+
return self.__tbl_from_raw_sql(build_sql_query(meta.query(table)))
131+
else:
132+
reply = req.get(
133+
self.conn_str + "source/" + str(source.ix) + "/download",
134+
stream=True,
135+
cookies=self.__cookies,
136+
)
137+
if reply.ok:
138+
return pd.read_csv(reply.text)
139+
140+
def download_query(self, query, sources):
141+
"""Constructs a DataFrame from a SQLAlchemy query and corresponding sources
142+
143+
Note that this method will sequentially search for each columns type in the list
144+
of sources and take the first match. This is necessary since query results only
145+
include column names and not data source identifiers.
146+
"""
147+
return self.__tbl_from_raw_sql(build_sql_query(query), sources)
127148

128-
if source:
129-
is_json = any(c["type"][0] == "JSON" for c in source.columns)
130-
if is_json:
131-
# We need a JSON document in that case
132-
# the sql endpoint will return one
133-
meta = PGMeta([source])
134-
table = meta[source]
135-
return self.__tbl_from_raw_sql(build_sql_query(meta.query(table)))
136-
else:
137-
reply = req.get(self.conn_str + "source/" + str(source.ix) + "/download",
138-
stream=True,
139-
cookies=self.__cookies)
140-
if reply.ok:
141-
return pd.read_csv(reply.text)
142-
elif query:
143-
return self.__tbl_from_raw_sql(build_sql_query(query))
144-
145-
def __tbl_from_raw_sql(self, query):
149+
def __tbl_from_raw_sql(self, query, sources):
146150
reply = req.post(
147-
self.conn_str + "sql",
148-
json={"sql": query},
149-
stream=True,
150-
cookies=self.__cookies)
151-
if reply.ok:
152-
# return pd.read_json(reply.raw)
153-
return pd.DataFrame(reply.json())
151+
self.conn_str + "sql", json={"sql": query}, stream=True, cookies=self.__cookies
152+
)
153+
reply.raise_for_status()
154+
return _query_to_df(reply.json(), sources)
155+
156+
157+
def _group_rows(rows: List[dict]):
158+
columns = {}
159+
for row in rows:
160+
for c, v in row.items():
161+
if c in columns:
162+
columns[c].append(v)
163+
else:
164+
columns[c] = [v]
165+
return columns
166+
167+
168+
def _get_dtype(col_name, sources):
169+
for source in sources:
170+
if col_name in source.col_types:
171+
return source.col_types[col_name]
172+
return object
173+
174+
175+
def _query_to_df(rows, sources):
176+
grouped = _group_rows(rows)
177+
col_names = list(grouped.keys())
178+
series = []
179+
for name in col_names:
180+
vals = grouped.pop(name)
181+
series.append(pd.Series(vals, name=name, dtype=_get_dtype(name, sources)))
182+
df = pd.concat(series, axis=1)
183+
df.columns = col_names
184+
return df

clients/PyLagoon/PyLagoon/postgresql.py

+17-12
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from PyLagoon.config import LagoonConfig
2525
from PyLagoon.source import Source
2626

27+
2728
class PGMeta:
2829
"""Given a list of Sources, builds the classes necessary
2930
to the querying EDSL.
@@ -34,7 +35,7 @@ def __init__(self, sources):
3435
self.__md = sa.MetaData()
3536
for s in sources:
3637
self.__add_source_to_md(s)
37-
self.__base = automap_base(metadata = self.__md)
38+
self.__base = automap_base(metadata=self.__md)
3839
self.__base.prepare()
3940

4041
@property
@@ -48,8 +49,9 @@ def __getitem__(self, key):
4849
return self.__base.classes[key]
4950

5051
def __sql_column_from_json_column(self, col):
51-
typ = col["type"] \
52-
.replace(" ", "_").replace("DOCUMENT", "TEXT") # "DOUBLE PRECISION" -> "DOUBLE_PRECISION"
52+
typ = (
53+
col["type"].replace(" ", "_").replace("DOCUMENT", "TEXT")
54+
) # "DOUBLE PRECISION" -> "DOUBLE_PRECISION"
5355
subtyp = None
5456
sql_typ = None
5557
if isinstance(typ, list):
@@ -66,10 +68,13 @@ def __sql_column_from_json_column(self, col):
6668
return sa.Column(col["inView"], sql_typ)
6769

6870
def __add_source_to_md(self, source):
69-
sa.Table(source.view_name, self.__md,
70-
sa.Column("ix", sa.Integer, primary_key=True),
71-
*(self.__sql_column_from_json_column(c)
72-
for c in source.columns))
71+
sa.Table(
72+
source.view_name,
73+
self.__md,
74+
sa.Column("ix", sa.Integer, primary_key=True),
75+
*(self.__sql_column_from_json_column(c) for c in source.columns.values()),
76+
schema=source.schema
77+
)
7378

7479
def query(self, *sources):
7580
"""Starts a query on the given sources, which can
@@ -82,9 +87,9 @@ def query(self, *sources):
8287

8388
if len(sources) == 0:
8489
sources = self.__view_names
85-
return Session().query(*((self[s] if isinstance(s,str) or isinstance(s,Source)
86-
else s)
87-
for s in sources))
90+
return Session().query(
91+
*((self[s] if isinstance(s, str) or isinstance(s, Source) else s) for s in sources)
92+
)
8893

8994

9095
def build_sql_query(query):
@@ -93,8 +98,8 @@ def build_sql_query(query):
9398
to the lagoon-server"""
9499

95100
d = psql.dialect()
96-
q = query.statement.compile(dialect = d)
97-
#The following is not ideal, as q.params and str(q) should
101+
q = query.statement.compile(dialect=d)
102+
# The following is not ideal, as q.params and str(q) should
98103
# normally be passed separately to the PostgreSQL database:
99104
ps = {}
100105
for k, v in q.params.items():

clients/PyLagoon/PyLagoon/source.py

+44-4
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,34 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import pandas as pd
15+
import numpy as np
16+
17+
LAGOON_TYPES = {
18+
"BOOLEAN": bool,
19+
"INTEGER": int,
20+
"BIGINT": pd.Int64Dtype,
21+
"DOUBLE PRECISION": np.float64,
22+
"TEXT": str,
23+
"ARR": object,
24+
"DOCUMENT": object,
25+
"JSON": object,
26+
}
27+
UNKNOWN_COLUMN_TYPE = object
28+
29+
1430
class Source:
1531
"""A wrapper for the json description of sources
1632
returned by lagoon-server"""
17-
33+
34+
INDEX_COL = "ix"
35+
INDEX_COL_TYPE = int
36+
1837
def __init__(self, json):
1938
self.__json = json
2039

2140
def __str__(self):
22-
return "<Source: ix={0}, view_name={1}>".format(
23-
self.ix, self.view_name)
41+
return "<Source: ix={0}, view_name={1}>".format(self.ix, self.view_name)
2442

2543
def __repr__(self):
2644
return str(self)
@@ -35,8 +53,30 @@ def view_name(self):
3553

3654
@property
3755
def columns(self):
38-
return self.__json["columns"]
56+
return {get_column_name(c): c for c in self.__json["columns"]}
3957

4058
@property
4159
def _json(self):
4260
return self.__json
61+
62+
@property
63+
def schema(self):
64+
return self.__json["schema"]
65+
66+
@property
67+
def col_types(self):
68+
types = {name: get_column_type(col) for name, col in self.columns.items()}
69+
types[self.INDEX_COL] = self.INDEX_COL_TYPE
70+
return types
71+
72+
73+
def get_column_type(column: dict):
74+
column_type = column["type"]
75+
if column_type in LAGOON_TYPES:
76+
return LAGOON_TYPES[column_type]
77+
else:
78+
return UNKNOWN_COLUMN_TYPE
79+
80+
81+
def get_column_name(column: dict):
82+
return column["inView"]

0 commit comments

Comments
 (0)