-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_cholera.py
251 lines (204 loc) · 8.47 KB
/
test_cholera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
Test case for Loop Modulus calculation on the Cholera dataset.
This script demonstrates how to use the loop modulus algorithm with
real geographic data and visualize the results.
Requirements:
- libpysal
- geopandas
- contextily
- matplotlib
- networkx
- numpy
Run with:
python test_cholera.py
"""
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import time
import warnings
import os
# Suppress warnings for cleaner output
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=UserWarning)
# Check if required packages are installed
try:
from libpysal import weights
from libpysal.cg import voronoi_frames
import geopandas
has_geo_deps = True
except ImportError:
print("Geographic dependencies not found. To install:")
print("pip install libpysal geopandas contextily")
has_geo_deps = False
# Import our loop modulus code
from core import calculate_loop_modulus_rho_preprocessed
from plotting import plot_initial_cycles, plot_final_rho
def load_cholera_data():
"""Load the cholera dataset and create a Delaunay graph."""
try:
# Check if the data file exists
if not os.path.exists("cholera_cases.gpkg"):
print("Error: Data file 'cholera_cases.gpkg' not found.")
print("Please download the data or run this test in the correct directory.")
return None, None, None
# Load the data
cases = geopandas.read_file("cholera_cases.gpkg")
coordinates = np.column_stack((cases.geometry.x, cases.geometry.y))
# Create Voronoi diagram and Delaunay triangulation
cells, generators = voronoi_frames(coordinates, clip="convex hull")
delaunay = weights.Rook.from_dataframe(cells)
delaunay_graph = delaunay.to_networkx()
positions = dict(zip(delaunay_graph.nodes, coordinates))
return delaunay_graph, positions, cells
except Exception as e:
print(f"Error loading cholera data: {e}")
return None, None, None
def plot_delaunay_graph(delaunay_graph, positions, cells):
"""Plot the Delaunay graph with a base map."""
try:
# Create the plot
plt.figure(figsize=(12, 10))
ax = cells.plot(facecolor="lightblue", alpha=0.50, edgecolor="cornsilk", linewidth=2)
# Try to add a basemap (may fail if internet connection is unavailable)
try:
from contextily import add_basemap
add_basemap(ax)
except Exception as e:
print(f"Could not add basemap: {e}")
# Draw the graph
ax.axis("off")
nx.draw(
delaunay_graph,
positions,
ax=ax,
node_size=2,
node_color="k",
edge_color="k",
alpha=0.8,
)
plt.title("Delaunay Graph of Cholera Cases")
plt.savefig("cholera_delaunay_graph.png", dpi=300, bbox_inches='tight')
print("Saved Delaunay graph visualization to 'cholera_delaunay_graph.png'")
plt.close()
except Exception as e:
print(f"Error plotting Delaunay graph: {e}")
def plot_rho_weights_small(G, rho, edge_index):
"""Plot a small graph with the extremal densities (rho) as edge weights."""
try:
# Create figure with subplots
plt.figure(figsize=(10, 8))
ax = plt.gca()
# Create edge weights dictionary
edge_weights = {e: round(rho[edge_index[e]], 3) for e in G.edges()}
edge_labels = {e: f"{w:.3f}" for e, w in edge_weights.items()}
# Get node positions
pos = nx.spring_layout(G, seed=42)
# Draw nodes
nx.draw_networkx_nodes(G, pos, node_size=700, node_color='lightblue', ax=ax)
# Draw edges with weight-based styling
weights = [rho[edge_index[e]] * 5 for e in G.edges()] # Scale for visibility
edges = nx.draw_networkx_edges(
G, pos, width=weights, alpha=0.7,
edge_color=weights, edge_cmap=plt.cm.Blues,
ax=ax
)
# Add edge labels
nx.draw_networkx_edge_labels(
G, pos, edge_labels=edge_labels,
font_size=10, label_pos=0.5, ax=ax
)
# Add node labels
nx.draw_networkx_labels(G, pos, font_size=12, font_weight='bold', ax=ax)
# Create colorbar
sm = plt.cm.ScalarMappable(
cmap=plt.cm.Blues,
norm=plt.Normalize(vmin=min(rho), vmax=max(rho))
)
sm.set_array([])
plt.colorbar(sm, ax=ax, label='Edge Weight (ρ)')
# Customize plot
ax.set_title("Graph with Loop Modulus Edge Weights")
ax.axis('off')
plt.tight_layout()
plt.savefig("loop_modulus_weights_small.png", dpi=300, bbox_inches='tight')
print("Saved small graph with weights to 'loop_modulus_weights_small.png'")
plt.close()
except Exception as e:
print(f"Error plotting small graph with weights: {e}")
def main():
"""Run the main test case."""
print("=== Loop Modulus Test Case: Cholera Dataset ===")
# Check if we have the geographic dependencies
if not has_geo_deps:
print("\nCannot run the full test without geographic dependencies.")
print("Will create a smaller test example instead.")
# Create a small test graph
G = nx.cycle_graph(5)
G.add_edge(0, 2)
G.add_edge(0, 3)
pos = nx.spring_layout(G, seed=42)
# Create edge index mapping
edge_list = [tuple(sorted(e)) for e in G.edges()]
edge_index = {e: i for i, e in enumerate(edge_list)}
print("\n=== Running Modulus Calculation on Small Test Graph ===")
start_time = time.time()
# Run the loop modulus calculation
final_rho_star_map, edge_list, added_cycles, final_modulus, qp_solves_count, total_time = \
calculate_loop_modulus_rho_preprocessed(
G,
tolerance=1e-4,
max_iterations=100,
initial_set_target_size=5,
parallel_seeds=2,
cycles_to_add_per_iter=2,
print_interval=5
)
# Create numpy array for rho values
rho = np.zeros(len(edge_list))
for edge, idx in edge_index.items():
rho[idx] = final_rho_star_map.get(edge, 0.0)
# Plot the results
plot_rho_weights_small(G, rho, edge_index)
print(f"\nCalculation completed in {total_time:.2f}s")
print(f"Final Modulus: {final_modulus:.6f}")
print(f"QP Solves: {qp_solves_count}")
print(f"Total Constraints: {len(added_cycles)}")
return
# Load the cholera data
print("\nLoading Cholera dataset...")
delaunay_graph, positions, cells = load_cholera_data()
if delaunay_graph is None:
print("Could not load cholera data. Exiting.")
return
# Plot the Delaunay graph
print("\nPlotting Delaunay graph...")
plot_delaunay_graph(delaunay_graph, positions, cells)
# Run the loop modulus calculation
print("\n=== Running Modulus Calculation with Preprocessing and heuristics===")
start_time = time.time()
final_rho_star_map, edge_list, added_cycles, final_modulus, qp_solves_count, total_time = \
calculate_loop_modulus_rho_preprocessed(
delaunay_graph,
tolerance=1e-4 # Use same tolerance
)
# Plot the results
print("\nPlotting initial cycles and final rho values...")
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 8))
# Plot initial cycles
initial_cycles = added_cycles[:min(len(added_cycles), 235)] # First cycles are from preprocessing
plot_initial_cycles(delaunay_graph, positions, initial_cycles, ax=ax1,
title=f"Initial Cycle Set (n={len(initial_cycles)})")
# Plot final rho* values
plot_final_rho(delaunay_graph, positions, final_rho_star_map, edge_list, ax=ax2,
title=f"Final Rho* Values (Mod2≈{final_modulus:.2f})")
plt.tight_layout()
plt.savefig("cholera_loop_modulus_results.png", dpi=300, bbox_inches='tight')
print("Saved results visualization to 'cholera_loop_modulus_results.png'")
plt.close()
# Report the final timing
print(f"\nTotal runtime: {time.time() - start_time:.2f}s")
print(f"Algorithm time: {total_time:.2f}s")
print("Test completed successfully!")
if __name__ == "__main__":
main()