Skip to content

Commit 20cea00

Browse files
committed
Revert "Merge pull request #16 from mpozniak95/fix-sync"
This reverts commit 85a6bc7, reversing changes made to ba175b1.
1 parent 85a6bc7 commit 20cea00

File tree

3 files changed

+20
-86
lines changed

3 files changed

+20
-86
lines changed

datasets/datasets.json

-17
Original file line numberDiff line numberDiff line change
@@ -1203,22 +1203,5 @@
12031203
"type": "tar",
12041204
"link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/random_keywords_1m_vocab_10_no_filters.tgz",
12051205
"path": "random-100-match-kw-small-vocab/random_keywords_1m_vocab_10_no_filters"
1206-
},
1207-
{
1208-
"name": "laion-img-emb-512-1M-cosine",
1209-
"vector_size": 512,
1210-
"distance": "cosine",
1211-
"type": "h5",
1212-
"path": "laion-img-emb-512/laion-img-emb-512-1M-cosine.hdf5",
1213-
"link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/laion400m/laion-img-emb-512-100M-cosine.hdf5"
1214-
},
1215-
{
1216-
"name": "laion-img-emb-512-1M-100ktrain-cosine",
1217-
"vector_size": 512,
1218-
"distance": "cosine",
1219-
"type": "h5",
1220-
"path": "laion-img-emb-512/laion-img-emb-512-1M-100ktrain-cosine.hdf5",
1221-
"link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/laion400m/laion-img-emb-512-100M-cosine.hdf5"
12221206
}
1223-
12241207
]

engine/base_client/client.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,8 @@ def save_search_results(
4040
):
4141
now = datetime.now()
4242
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
43-
pid = os.getpid() # Get the current process ID
4443
experiments_file = (
45-
f"{self.name}-{dataset_name}-search-{search_id}-{pid}-{timestamp}.json"
44+
f"{self.name}-{dataset_name}-search-{search_id}-{timestamp}.json"
4645
)
4746
result_path = RESULTS_DIR / experiments_file
4847
with open(result_path, "w") as out:
@@ -98,8 +97,7 @@ def run_experiment(
9897
reader = dataset.get_reader(execution_params.get("normalize", False))
9998

10099
if skip_if_exists:
101-
pid = os.getpid() # Get the current process ID
102-
glob_pattern = f"{self.name}-{dataset.config.name}-search-{pid}-*-*.json"
100+
glob_pattern = f"{self.name}-{dataset.config.name}-search-*-*.json"
103101
existing_results = list(RESULTS_DIR.glob(glob_pattern))
104102
if len(existing_results) == len(self.searchers):
105103
print(
@@ -137,9 +135,8 @@ def run_experiment(
137135
print("Experiment stage: Search")
138136
for search_id, searcher in enumerate(self.searchers):
139137
if skip_if_exists:
140-
pid = os.getpid() # Get the current process ID
141138
glob_pattern = (
142-
f"{self.name}-{dataset.config.name}-search-{search_id}-{pid}-*.json"
139+
f"{self.name}-{dataset.config.name}-search-{search_id}-*.json"
143140
)
144141
existing_results = list(RESULTS_DIR.glob(glob_pattern))
145142
print("Pattern", glob_pattern, "Results:", existing_results)

engine/base_client/search.py

+17-63
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import functools
22
import time
3-
from multiprocessing import get_context, Barrier, Process, Queue
3+
from multiprocessing import get_context
44
from typing import Iterable, List, Optional, Tuple
55
import itertools
66

@@ -65,10 +65,6 @@ def search_all(
6565
):
6666
parallel = self.search_params.get("parallel", 1)
6767
top = self.search_params.get("top", None)
68-
69-
# Convert queries to a list to calculate its length
70-
queries = list(queries) # This allows us to calculate len(queries)
71-
7268
# setup_search may require initialized client
7369
self.init_client(
7470
self.host, distance, self.connection_params, self.search_params
@@ -84,56 +80,31 @@ def search_all(
8480
print(f"Limiting queries to [0:{MAX_QUERIES-1}]")
8581

8682
if parallel == 1:
87-
# Single-threaded execution
8883
start = time.perf_counter()
89-
90-
results = [search_one(query) for query in tqdm.tqdm(queries)]
91-
total_time = time.perf_counter() - start
92-
84+
precisions, latencies = list(
85+
zip(*[search_one(query) for query in tqdm.tqdm(used_queries)])
86+
)
9387
else:
94-
# Dynamically calculate chunk size
95-
chunk_size = max(1, len(queries) // parallel)
96-
query_chunks = list(chunked_iterable(queries, chunk_size))
88+
ctx = get_context(self.get_mp_start_method())
9789

98-
# Function to be executed by each worker process
99-
def worker_function(chunk, result_queue):
100-
self.__class__.init_client(
90+
with ctx.Pool(
91+
processes=parallel,
92+
initializer=self.__class__.init_client,
93+
initargs=(
10194
self.host,
10295
distance,
10396
self.connection_params,
10497
self.search_params,
98+
),
99+
) as pool:
100+
if parallel > 10:
101+
time.sleep(15) # Wait for all processes to start
102+
start = time.perf_counter()
103+
precisions, latencies = list(
104+
zip(*pool.imap_unordered(search_one, iterable=tqdm.tqdm(used_queries)))
105105
)
106-
self.setup_search()
107-
results = process_chunk(chunk, search_one)
108-
result_queue.put(results)
109-
110-
# Create a queue to collect results
111-
result_queue = Queue()
112-
113-
# Create and start worker processes
114-
processes = []
115-
for chunk in query_chunks:
116-
process = Process(target=worker_function, args=(chunk, result_queue))
117-
processes.append(process)
118-
process.start()
119-
120-
# Start measuring time for the critical work
121-
start = time.perf_counter()
122106

123-
# Collect results from all worker processes
124-
results = []
125-
for _ in processes:
126-
results.extend(result_queue.get())
127-
128-
# Wait for all worker processes to finish
129-
for process in processes:
130-
process.join()
131-
132-
# Stop measuring time for the critical work
133-
total_time = time.perf_counter() - start
134-
135-
# Extract precisions and latencies (outside the timed section)
136-
precisions, latencies = zip(*results)
107+
total_time = time.perf_counter() - start
137108

138109
self.__class__.delete_client()
139110

@@ -161,20 +132,3 @@ def post_search(self):
161132
@classmethod
162133
def delete_client(cls):
163134
pass
164-
165-
166-
def chunked_iterable(iterable, size):
167-
"""Yield successive chunks of a given size from an iterable."""
168-
it = iter(iterable)
169-
while chunk := list(itertools.islice(it, size)):
170-
yield chunk
171-
172-
173-
def process_chunk(chunk, search_one):
174-
"""Process a chunk of queries using the search_one function."""
175-
return [search_one(query) for query in chunk]
176-
177-
178-
def process_chunk_wrapper(chunk, search_one):
179-
"""Wrapper to process a chunk of queries."""
180-
return process_chunk(chunk, search_one)

0 commit comments

Comments
 (0)