Skip to content

Commit

Permalink
move to async batches (#75)
Browse files Browse the repository at this point in the history
* move to async batches

* fix build

* fix tests

* fix perf

* cap async tasks

* pull 1 task at time

* Update query_buffer.py

* Update query_buffer.py

Co-authored-by: Roi Lipman <swilly22@users.noreply.github.com>
  • Loading branch information
AviAvni and swilly22 authored Sep 14, 2021
1 parent 4c7f57a commit f42578c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 7 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ vulture = "^2.3"
pytest = "^6.2.4"
pytest-cov = "^2.12.1"
redisgraph = "^2.4.0"
pathos = "^0.2.8"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
1 change: 1 addition & 0 deletions redisgraph_bulk_loader/bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, ssl_keyfile

# Send all remaining tokens to Redis
query_buf.send_buffer()
query_buf.wait_pool()

end_time = timer()
query_buf.report_completion(end_time - start_time)
Expand Down
2 changes: 1 addition & 1 deletion redisgraph_bulk_loader/label.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def update_node_dictionary(self, identifier):

def process_entities(self):
entities_created = 0
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader:
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str, update_min_steps=100) as reader:
for row in reader:
self.validate_row(row)

Expand Down
34 changes: 29 additions & 5 deletions redisgraph_bulk_loader/query_buffer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from pathos.pools import ThreadPool as Pool

def run(client, graphname, args):
result = client.execute_command("GRAPH.BULK", graphname, *args)
stats = result.split(', '.encode())
return stats

class QueryBuffer:
def __init__(self, graphname, client, config):
self.nodes = None
Expand Down Expand Up @@ -30,7 +37,9 @@ def __init__(self, graphname, client, config):
self.nodes_created = 0 # Total number of nodes created
self.relations_created = 0 # Total number of relations created

# TODO consider using a queue to send commands asynchronously
self.pool = Pool(nodes=1)
self.tasks = []

def send_buffer(self):
"""Send all pending inserts to Redis"""
# Do nothing if we have no entities
Expand All @@ -43,10 +52,8 @@ def send_buffer(self):
args.insert(0, "BEGIN")
self.initial_query = False

result = self.client.execute_command("GRAPH.BULK", self.graphname, *args)
stats = result.split(', '.encode())
self.nodes_created += int(stats[0].split(' '.encode())[0])
self.relations_created += int(stats[1].split(' '.encode())[0])
task = self.pool.apipe(run, self.client, self.graphname, args)
self.add_task(task)

self.clear_buffer()

Expand All @@ -59,6 +66,23 @@ def clear_buffer(self):
self.buffer_size = 0
self.node_count = 0
self.relation_count = 0

def add_task(self, task):
self.tasks.append(task)
if len(self.tasks) == 5:
task = self.tasks.pop(0)
stats = task.get()
self.update_stats(stats)

def wait_pool(self):
for task in self.tasks:
stats = task.get()
self.update_stats(stats)
self.tasks.clear()

def update_stats(self, stats):
self.nodes_created += int(stats[0].split(' '.encode())[0])
self.relations_created += int(stats[1].split(' '.encode())[0])

def report_completion(self, runtime):
print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds"
Expand Down
2 changes: 1 addition & 1 deletion redisgraph_bulk_loader/relation_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def post_process_header_with_schema(self, header):

def process_entities(self):
entities_created = 0
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader:
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str, update_min_steps=100) as reader:
for row in reader:
self.validate_row(row)
try:
Expand Down

0 comments on commit f42578c

Please sign in to comment.