Skip to content

Commit 996d1fa

Browse files
author
Sean Sullivan
committed
Add support for plotting geo_shapes
1 parent 298a58f commit 996d1fa

File tree

2 files changed

+163
-22
lines changed

2 files changed

+163
-22
lines changed

elastic_datashader/elastic.py

+75-1
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,20 @@ def convert_nm_to_ellipse_units(distance: float, units: str) -> float:
158158
# NB. assume "majmin_m" if any others
159159
return distance * 1852
160160

161+
def get_field_type(elastic_hosts: str,headers: Optional[str],params: Dict[str, Any],field:str,idx: str) -> str:
162+
user = params.get("user")
163+
x_opaque_id = params.get("x-opaque-id")
164+
es = Elasticsearch(
165+
elastic_hosts.split(","),
166+
verify_certs=False,
167+
timeout=900,
168+
headers=get_es_headers(headers, user,x_opaque_id),
169+
)
170+
mappings = es.indices.get_field_mapping(fields=field,index=idx)
171+
#{'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}}
172+
index = list(mappings.keys())[0] #if index is my_index* it comes back as my_index
173+
return mappings[index]['mappings'][field]['mapping'][field]['type']
174+
161175
def get_search_base(
162176
elastic_hosts: str,
163177
headers: Optional[str],
@@ -497,9 +511,12 @@ def geotile_bucket_to_lonlat(bucket):
497511
if hasattr(bucket, "centroid"):
498512
lon = bucket.centroid.location.lon
499513
lat = bucket.centroid.location.lat
500-
else:
514+
elif hasattr(bucket.key,'grids'):
501515
z, x, y = [ int(x) for x in bucket.key.grids.split("/") ]
502516
lon, lat = mu.center(x, y, z)
517+
else:
518+
z, x, y = [ int(x) for x in bucket.key.split("/") ]
519+
lon, lat = mu.center(x, y, z)
503520
return lon, lat
504521

505522
def split_fieldname_to_list(field: str) -> List[str]:
@@ -554,6 +571,63 @@ def chunk_iter(iterable, chunk_size):
554571
last_written_idx =( i % chunk_size)
555572
yield (False, chunks[0:last_written_idx+1])
556573

574+
def bucket_noop(bucket,search):
575+
return bucket
576+
class Scan:
577+
def __init__(self, searches, inner_aggs=None,field=None,precision=None, size=10, timeout=None,bucket_callback=bucket_noop):
578+
self.field = field
579+
self.precision = precision
580+
self.searches = searches
581+
self.inner_aggs = inner_aggs if inner_aggs is not None else {}
582+
self.size = size
583+
self.num_searches = 0
584+
self.total_took = 0
585+
self.total_shards = 0
586+
self.total_skipped = 0
587+
self.total_successful = 0
588+
self.total_failed = 0
589+
self.timeout = timeout
590+
self.aborted = False
591+
self.bucket_callback = bucket_callback
592+
if self.bucket_callback is None:
593+
self.bucket_callback = bucket_noop
594+
595+
def execute(self):
596+
"""
597+
Helper function used to iterate over all possible bucket combinations of
598+
``source_aggs``, returning results of ``inner_aggs`` for each. Uses the
599+
``composite`` aggregation under the hood to perform this.
600+
"""
601+
self.num_searches = 0
602+
self.total_took = 0
603+
self.aborted = False
604+
605+
def run_search(s,**kwargs):
606+
_timeout_at = kwargs.pop("timeout_at", None)
607+
if _timeout_at:
608+
_time_remaining = _timeout_at - int(time.time())
609+
s = s.params(timeout=f"{_time_remaining}s")
610+
if self.field and self.precision:
611+
s.aggs.bucket("comp", "geotile_grid", field=self.field,precision=self.precision,size=self.size)
612+
#logger.info(json.dumps(s.to_dict(),indent=2,default=str))
613+
return s.execute()
614+
615+
timeout_at = None
616+
if self.timeout:
617+
timeout_at = int(time.time()) + self.timeout
618+
for search in self.searches:
619+
response = run_search(search,timeout_at=timeout_at)
620+
self.num_searches += 1
621+
self.total_took += response.took
622+
self.total_shards += response._shards.total # pylint: disable=W0212
623+
self.total_skipped += response._shards.skipped # pylint: disable=W0212
624+
self.total_successful += response._shards.successful # pylint: disable=W0212
625+
self.total_failed += response._shards.failed # pylint: disable=W0212
626+
for b in response.aggregations.comp.buckets:
627+
b = self.bucket_callback(b,self)
628+
yield b
629+
630+
557631
class ScanAggs:
558632
def __init__(self, search, source_aggs, inner_aggs=None, size=10, timeout=None):
559633
self.search = search

elastic_datashader/tilegen.py

+88-21
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
gen_overlay,
3333
)
3434
from .elastic import (
35+
get_field_type,
3536
get_search_base,
3637
convert_composite,
3738
split_fieldname_to_list,
3839
get_nested_field_from_hit,
3940
to_32bit_float,
41+
Scan,
4042
ScanAggs,
4143
get_tile_categories,
4244
scan
@@ -534,6 +536,9 @@ def get_span_upper_bound(span_range: str, estimated_points_per_tile: Optional[in
534536
if span_range == "wide":
535537
return math.log(1e9)
536538

539+
if span_range == "ultrawide":
540+
return math.log(1e308)
541+
537542
assert estimated_points_per_tile is not None
538543
return math.log(max(estimated_points_per_tile * 2, 2))
539544

@@ -981,7 +986,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
981986
base_s = get_search_base(config.elastic_hosts, headers, params, idx)
982987

983988
# Now find out how many documents
984-
count_s = copy.copy(base_s)
989+
count_s = copy.copy(base_s)[0:0] #slice of array sets from/size since we are aggregating the data we don't need the hits
985990
count_s = count_s.filter("geo_bounding_box", **{geopoint_field: bb_dict})
986991

987992
doc_cnt = count_s.count()
@@ -1109,26 +1114,89 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11091114

11101115
# the composite needs one bin for 'after_key'
11111116
composite_agg_size = int(max_bins / inner_agg_size) - 1
1112-
1113-
resp = ScanAggs(
1114-
tile_s,
1115-
{"grids": A("geotile_grid", field=geopoint_field, precision=geotile_precision)},
1116-
inner_aggs,
1117-
size=composite_agg_size,
1118-
timeout=config.query_timeout_seconds
1119-
)
1120-
1117+
field_type = get_field_type(config.elastic_hosts, headers, params,geopoint_field, idx)
11211118
partial_data = False # TODO can we get partial data?
1122-
df = pd.DataFrame(
1123-
convert_composite(
1124-
resp.execute(),
1125-
(category_field is not None),
1126-
bool(category_filters),
1127-
histogram_interval,
1128-
category_type,
1129-
category_format
1119+
if field_type == "geo_point":
1120+
resp = ScanAggs(
1121+
tile_s,
1122+
{"grids": A("geotile_grid", field=geopoint_field, precision=geotile_precision)},
1123+
inner_aggs,
1124+
size=composite_agg_size,
1125+
timeout=config.query_timeout_seconds
11301126
)
1131-
)
1127+
1128+
1129+
df = pd.DataFrame(
1130+
convert_composite(
1131+
resp.execute(),
1132+
(category_field is not None),
1133+
bool(category_filters),
1134+
histogram_interval,
1135+
category_type,
1136+
category_format
1137+
)
1138+
)
1139+
estimated_points_per_tile = get_estimated_points_per_tile(span_range, global_bounds, z, global_doc_cnt)
1140+
elif field_type == "geo_shape":
1141+
shape_s = copy.copy(tile_s)
1142+
searches = []
1143+
estimated_points_per_tile = 10000
1144+
zoom = 0
1145+
#span_range = "ultrawide"
1146+
if resolution == "coarse":
1147+
zoom = 5
1148+
spread = 7
1149+
elif resolution == "fine":
1150+
zoom = 6
1151+
spread = 3
1152+
elif resolution == "finest":
1153+
zoom = 7
1154+
spread = 1
1155+
searches = []
1156+
composite_agg_size = 65536#max agg bucket size
1157+
geotile_precision = current_zoom+zoom
1158+
subtile_bb_dict = create_bounding_box_for_tile(x, y, z)
1159+
subtile_s = copy.copy(base_s)
1160+
subtile_s = subtile_s[0:0]
1161+
subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: subtile_bb_dict})
1162+
subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=composite_agg_size,bounds=subtile_bb_dict)
1163+
searches.append(subtile_s)
1164+
#logger.info(inner_aggs)
1165+
cmap = "bmy" #todo have front end pass the cmap for none categorical
1166+
def calc_aggregation(bucket,search):
1167+
#get bounds from bucket.key
1168+
#do search for sum of values on category_field
1169+
z, x, y = [ int(x) for x in bucket.key.split("/") ]
1170+
bucket_bb_dict = create_bounding_box_for_tile(x, y, z)
1171+
subtile_s = copy.copy(base_s)
1172+
subtile_s.aggs.bucket("sum","median_absolute_deviation",field=category_field,missing=0)
1173+
subtile_s = subtile_s[0:0]
1174+
subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: bucket_bb_dict})
1175+
response = subtile_s.execute()
1176+
search.num_searches += 1
1177+
search.total_took += response.took
1178+
search.total_shards += response._shards.total # pylint: disable=W0212
1179+
search.total_skipped += response._shards.skipped # pylint: disable=W0212
1180+
search.total_successful += response._shards.successful # pylint: disable=W0212
1181+
search.total_failed += response._shards.failed # pylint: disable=W0212
1182+
bucket.doc_count = response.aggregations.sum['value'] #replace with sum of category_field
1183+
return bucket
1184+
bucket_callback = None
1185+
if category_field:
1186+
bucket_callback = calc_aggregation
1187+
resp = Scan(searches,timeout=config.query_timeout_seconds,bucket_callback=bucket_callback)
1188+
df = pd.DataFrame(
1189+
convert_composite(
1190+
resp.execute(),
1191+
False,#we don't need categorical, because ES doesn't support composite buckets for geo_shapes we calculate that with a secondary search in the bucket_callback
1192+
False,#we dont need filter_buckets, because ES doesn't support composite buckets for geo_shapes we calculate that with a secondary search in the bucket_callback
1193+
histogram_interval,
1194+
category_type,
1195+
category_format
1196+
)
1197+
)
1198+
if len(df)/resp.num_searches == composite_agg_size:
1199+
logger.warn("clipping on tile %s",[x,y,z])
11321200

11331201
s2 = time.time()
11341202
logger.info("ES took %s (%s) for %s with %s searches", (s2 - s1), resp.total_took, len(df), resp.num_searches)
@@ -1142,7 +1210,6 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11421210
metrics["shards_failed"] = resp.total_failed
11431211
logger.info("%s", metrics)
11441212

1145-
estimated_points_per_tile = get_estimated_points_per_tile(span_range, global_bounds, z, global_doc_cnt)
11461213

11471214
if len(df.index) == 0:
11481215
img = gen_empty(tile_width_px, tile_height_px)
@@ -1154,7 +1221,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11541221

11551222
###############################################################
11561223
# Category Mode
1157-
if category_field:
1224+
if category_field and field_type != "geo_shape":
11581225
# TODO it would be nice if datashader honored the category orders
11591226
# in z-order, then we could make "Other" drawn underneath the less
11601227
# promenent colors

0 commit comments

Comments
 (0)