From 8a63ce41c75b2d78399b96023897e800e0871dc7 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Dec 2023 13:39:56 -0800 Subject: [PATCH] add lock to flow control --- google/cloud/bigtable/batcher.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index a6eb806e9..82c0bdb91 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -110,6 +110,7 @@ def __init__( self.inflight_size = 0 self.event = threading.Event() self.event.set() + self._lock = threading.Lock() def is_blocked(self): """Returns True if: @@ -128,9 +129,10 @@ def control_flow(self, batch_info): Calculate the resources used by this batch """ - self.inflight_mutations += batch_info.mutations_count - self.inflight_size += batch_info.mutations_size - self.set_flow_control_status() + with self._lock: + self.inflight_mutations += batch_info.mutations_count + self.inflight_size += batch_info.mutations_size + self.set_flow_control_status() def wait(self): """ @@ -154,9 +156,10 @@ def release(self, batch_info): Release the resources. Decrement the row size to allow enqueued mutations to be run. """ - self.inflight_mutations -= batch_info.mutations_count - self.inflight_size -= batch_info.mutations_size - self.set_flow_control_status() + with self._lock: + self.inflight_mutations -= batch_info.mutations_count + self.inflight_size -= batch_info.mutations_size + self.set_flow_control_status() class MutationsBatcher(object):