Skip to content

Commit 5b7ecfe

Browse files
author
Sean Sullivan
committed
Fix bug that trapped us in while loop and made searches faster using generated params
1 parent fadfeec commit 5b7ecfe

File tree

3 files changed

+33
-52
lines changed

3 files changed

+33
-52
lines changed

elastic_datashader/elastic.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,8 @@ def parse_duration_interval(interval):
429429
"years":"y"}
430430
kwargs = {}
431431
for key,value in durations.items():
432-
if interval[1] == value:
433-
kwargs[key] = int(interval[0])
432+
if interval[len(interval)-1] == value:
433+
kwargs[key] = int(interval[0:len(interval)-1])
434434
return relativedelta(**kwargs)
435435

436436
def convert(response, category_formatter=str):

elastic_datashader/parameters.py

+2-8
Original file line numberDiff line numberDiff line change
@@ -423,14 +423,7 @@ def generate_global_params(headers, params, idx):
423423
elif resolution == "finest":
424424
zoom = 7
425425
geotile_precision = current_zoom+zoom
426-
max_value_s = copy.copy(base_s)
427-
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=0,size=1)
428-
resp = max_value_s.execute()
429-
global_doc_cnt = resp.aggregations.comp.buckets[0].doc_count
430-
if global_doc_cnt > 100000:
431-
histogram_cnt = 200
432-
else:
433-
histogram_cnt = 500
426+
histogram_cnt = 500
434427

435428
if category_field:
436429
max_value_s = copy.copy(base_s)
@@ -445,6 +438,7 @@ def generate_global_params(headers, params, idx):
445438
resp = max_value_s.execute()
446439
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
447440
histogram_range = estimated_points_per_tile
441+
global_doc_cnt = estimated_points_per_tile
448442
if histogram_range > 0:
449443
# round to the nearest larger power of 10
450444
histogram_range = math.pow(

elastic_datashader/tilegen.py

+29-42
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from dataclasses import asdict, dataclass
22
from functools import lru_cache
33
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
4-
from datetime import datetime, timezone
54
import copy
65
import math
76
import time
@@ -1175,19 +1174,25 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11751174
spread = 1
11761175
geotile_precision = current_zoom+zoom
11771176
searches = []
1178-
if category_field:
1179-
max_value_s = copy.copy(base_s)
1180-
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
1181-
bucket.metric("sum","sum",field=category_field,missing=0)
1182-
resp = max_value_s.execute()
1183-
estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value']
1177+
1178+
if params.get("generated_params", {}).get('complete',False):
1179+
estimated_points_per_tile = params["generated_params"]['global_doc_cnt']
11841180
span = [0,estimated_points_per_tile]
1181+
logger.info("USING GENERATED PARAMS")
11851182
else:
1186-
max_value_s = copy.copy(base_s)
1187-
max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
1188-
resp = max_value_s.execute()
1189-
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
1190-
span = [0,estimated_points_per_tile]
1183+
if category_field:
1184+
max_value_s = copy.copy(base_s)
1185+
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
1186+
bucket.metric("sum","sum",field=category_field,missing=0)
1187+
resp = max_value_s.execute()
1188+
estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value']
1189+
span = [0,estimated_points_per_tile]
1190+
else:
1191+
max_value_s = copy.copy(base_s)
1192+
max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
1193+
resp = max_value_s.execute()
1194+
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
1195+
span = [0,estimated_points_per_tile]
11911196
logger.info("EST Points: %s %s",estimated_points_per_tile,category_field)
11921197

11931198
searches = []
@@ -1384,41 +1389,23 @@ def create_time_interval_searches(base_s,subtile_bb_dict,start_time,stop_time,ti
13841389
stime = start_time
13851390
searches = []
13861391
if interval == "auto":
1387-
subtile_s = copy.copy(base_s)
1388-
subtile_s = subtile_s[0:0]
1389-
subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: subtile_bb_dict})
1390-
subtile_s.aggs.bucket("by_time", "auto_date_histogram", field="lastupdated",buckets=546)
1391-
resp = subtile_s.execute()
1392-
interval = resp.aggregations.by_time.interval
1393-
#create a search for each bucket using the bucket time plus the interval
1394-
logger.info("Doing multiple queries based on interval %s",interval)
1395-
1396-
for bucket in resp.aggregations.by_time:
1397-
subtile_s = copy.copy(base_s)
1398-
bucket_start_time = datetime.strptime(bucket.key_as_string,"%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
1399-
bucket_stop_time = bucket_start_time+ parse_duration_interval(interval)
1400-
1401-
if timestamp_field:
1402-
time_range = {timestamp_field: {}}
1403-
if bucket_start_time is not None:
1404-
time_range[timestamp_field]["gte"] = bucket_start_time
1405-
if stop_time is not None:
1406-
time_range[timestamp_field]["lte"] = bucket_stop_time
1407-
1408-
if time_range and time_range[timestamp_field]:
1409-
subtile_s = subtile_s.filter("range", **time_range)
1410-
bucket = subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=composite_agg_size,bounds=subtile_bb_dict)
1411-
if category_field:
1412-
bucket.metric("sum","sum",field=category_field,missing=0)
1413-
searches.append(subtile_s)
1414-
return searches
1415-
1392+
delta = stop_time - start_time
1393+
minutes = delta.total_seconds() /60
1394+
step = 1 #step through all the minutes in an hour to find a fit
1395+
while minutes/step > 546:
1396+
step = step +1
1397+
interval = str(step)+"m"
1398+
1399+
logger.info("Actual time bucket %s",interval)
14161400
while stime < stop_time:
14171401
subtile_s = copy.copy(base_s)
14181402
subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: subtile_bb_dict})
14191403
subtile_s = subtile_s[0:0]
14201404
bucket_start_time = stime
1421-
bucket_stop_time = bucket_start_time+ parse_duration_interval(interval)
1405+
bucket_duration = parse_duration_interval(interval)
1406+
#logger.info(bucket_duration)
1407+
bucket_stop_time = bucket_start_time+ bucket_duration
1408+
bucket_stop_time = min(bucket_stop_time, stop_time)
14221409
time_range = {timestamp_field: {}}
14231410
time_range[timestamp_field]["gte"] = bucket_start_time
14241411
time_range[timestamp_field]["lte"] = bucket_stop_time

0 commit comments

Comments
 (0)