5
5
and provide recommendations for optimization.
6
6
"""
7
7
8
+ import logging
8
9
import statistics
9
10
from dataclasses import dataclass
10
11
from typing import Any , Dict , List , Optional , Tuple
15
16
16
17
from cratedb_toolkit .admin .xmover .util .database import CrateDBClient
17
18
19
+ logger = logging .getLogger (__name__ )
20
+
18
21
19
22
def format_storage_size (size_gb : float ) -> str :
20
23
"""Format storage size with appropriate units and spacing"""
@@ -134,7 +137,7 @@ def get_table_distribution_detailed(self, table_identifier: str) -> Optional[Tab
134
137
AND s.routing_state = 'STARTED'
135
138
GROUP BY s.schema_name, s.table_name, s.node['name']
136
139
ORDER BY s.node['name'] \
137
- """
140
+ """ # noqa: E501
138
141
139
142
result = self .client .execute_query (query , [schema_name , table_name ])
140
143
rows = result .get ("rows" , [])
@@ -190,7 +193,8 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
190
193
rprint (f"• Total Shards: { total_shards } ({ total_primary_shards } primary + { total_replica_shards } replica)" )
191
194
rprint (f"• Total Documents: { total_documents :,} " )
192
195
rprint (
193
- f"• Node Coverage: { len (table_nodes )} /{ len (cluster_nodes )} nodes ({ len (table_nodes ) / len (cluster_nodes ) * 100 :.0f} %)"
196
+ f"• Node Coverage: { len (table_nodes )} /{ len (cluster_nodes )} nodes "
197
+ f"({ len (table_nodes ) / len (cluster_nodes ) * 100 :.0f} %)"
194
198
)
195
199
196
200
if missing_nodes :
@@ -261,7 +265,8 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
261
265
# Storage distribution analysis
262
266
if storage_cv > 0.4 :
263
267
rprint (
264
- f"• [red]⚠ Storage Imbalance:[/red] Range { format_storage_size (min_storage )} -{ format_storage_size (max_storage )} per node (CV: { storage_cv :.2f} )"
268
+ f"• [red]⚠ Storage Imbalance:[/red] Range "
269
+ f"{ format_storage_size (min_storage )} -{ format_storage_size (max_storage )} per node (CV: { storage_cv :.2f} )"
265
270
)
266
271
else :
267
272
rprint (f"• [green]✓ Storage Balance:[/green] Well distributed (CV: { storage_cv :.2f} )" )
@@ -306,11 +311,13 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
306
311
for zone in sorted (zone_distribution .keys ()):
307
312
zone_data = zone_distribution [zone ]
308
313
rprint (
309
- f"• { zone } : { zone_data ['nodes' ]} nodes, { zone_data ['shards' ]} shards, { format_storage_size (zone_data ['size' ])} "
314
+ f"• { zone } : { zone_data ['nodes' ]} nodes, "
315
+ f"{ zone_data ['shards' ]} shards, { format_storage_size (zone_data ['size' ])} "
310
316
)
311
317
312
318
except Exception :
313
- pass # Zone info not available
319
+ # Zone info not available
320
+ logger .exception ("Zone info not available" )
314
321
315
322
# Health Summary
316
323
rprint ("\n [bold]💊 Health Summary[/bold]" )
@@ -375,7 +382,7 @@ def get_largest_tables_distribution(self, top_n: int = 10) -> List[TableDistribu
375
382
WHERE s.routing_state = 'STARTED'
376
383
GROUP BY s.schema_name, s.table_name, s.node['name']
377
384
ORDER BY s.schema_name, s.table_name, s.node['name'] \
378
- """
385
+ """ # noqa: E501
379
386
380
387
result = self .client .execute_query (query , [top_n ])
381
388
@@ -534,7 +541,8 @@ def detect_storage_imbalance(self, table: TableDistribution) -> Optional[Distrib
534
541
535
542
if overloaded_node and underloaded_node :
536
543
recommendations .append (
537
- f"Rebalance storage from { overloaded_node } ({ format_storage_size (max_size )} ) to { underloaded_node } ({ format_storage_size (min_size )} )"
544
+ f"Rebalance storage from { overloaded_node } ({ format_storage_size (max_size )} ) "
545
+ f"to { underloaded_node } ({ format_storage_size (min_size )} )"
538
546
)
539
547
540
548
return DistributionAnomaly (
@@ -643,7 +651,7 @@ def detect_document_imbalance(self, table: TableDistribution) -> Optional[Distri
643
651
recommendations = recommendations ,
644
652
)
645
653
646
- def analyze_distribution (self , top_tables : int = 10 ) -> List [DistributionAnomaly ]:
654
+ def analyze_distribution (self , top_tables : int = 10 ) -> Tuple [ List [DistributionAnomaly ], int ]:
647
655
"""Analyze shard distribution and return ranked anomalies"""
648
656
649
657
# Get table distributions
@@ -672,12 +680,13 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
672
680
673
681
if not anomalies :
674
682
rprint (
675
- f"[green]✓ No significant shard distribution anomalies detected in top { tables_analyzed } tables![/green]"
683
+ f"[green]✓ No significant shard distribution anomalies "
684
+ f"detected in top { tables_analyzed } tables![/green]"
676
685
)
677
686
return
678
687
679
688
# Show analysis scope
680
- unique_tables = set ( anomaly .table .full_table_name for anomaly in anomalies )
689
+ unique_tables = { anomaly .table .full_table_name for anomaly in anomalies }
681
690
rprint (
682
691
f"[blue]📋 Analyzed { tables_analyzed } largest tables, found issues in { len (unique_tables )} tables[/blue]"
683
692
)
@@ -731,7 +740,8 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
731
740
overloaded = [node for node , count in counts .items () if count == max_count ]
732
741
underloaded = [node for node , count in counts .items () if count == min_count ]
733
742
rprint (
734
- f" [red]⚠ Issue:[/red] { overloaded [0 ]} has { max_count } shards while { underloaded [0 ]} has only { min_count } shards"
743
+ f" [red]⚠ Issue:[/red] { overloaded [0 ]} has { max_count } shards "
744
+ f"while { underloaded [0 ]} has only { min_count } shards"
735
745
)
736
746
737
747
elif anomaly .anomaly_type == "Storage Imbalance" :
@@ -742,19 +752,20 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
742
752
overloaded = [node for node , size in sizes .items () if size == max_size ][0 ]
743
753
underloaded = [node for node , size in sizes .items () if size == min_size ][0 ]
744
754
rprint (
745
- f" [red]⚠ Issue:[/red] Storage ranges from { format_storage_size (min_size )} ({ underloaded } ) to { format_storage_size (max_size )} ({ overloaded } ) - { max_size / min_size :.1f} x difference"
755
+ f" [red]⚠ Issue:[/red] Storage ranges from { format_storage_size (min_size )} ({ underloaded } ) " # noqa: E501
756
+ f"to { format_storage_size (max_size )} ({ overloaded } ) - { max_size / min_size :.1f} x difference"
746
757
)
747
758
748
759
elif anomaly .anomaly_type == "Node Coverage Issue" :
749
760
if "nodes_without_shards" in anomaly .details :
750
761
missing_nodes = anomaly .details ["nodes_without_shards" ]
751
762
coverage_ratio = anomaly .details ["coverage_ratio" ]
752
763
rprint (
753
- f" [red]⚠ Issue:[/red] Table missing from { len (missing_nodes )} nodes ({ coverage_ratio :.0%} cluster coverage)"
754
- )
755
- rprint (
756
- f" [dim] Missing from: { ', ' .join (missing_nodes [:3 ])} { '...' if len (missing_nodes ) > 3 else '' } [/dim]"
764
+ f" [red]⚠ Issue:[/red] Table missing from { len (missing_nodes )} nodes "
765
+ f"({ coverage_ratio :.0%} cluster coverage)"
757
766
)
767
+ ellipsis = "..." if len (missing_nodes ) > 3 else ""
768
+ rprint (f" [dim] Missing from: { ', ' .join (missing_nodes [:3 ])} { ellipsis } [/dim]" )
758
769
759
770
elif anomaly .anomaly_type == "Document Imbalance" :
760
771
if "document_counts" in anomaly .details :
@@ -763,7 +774,8 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
763
774
max_docs = max (doc_counts .values ())
764
775
ratio = max_docs / min_docs if min_docs > 0 else float ("inf" )
765
776
rprint (
766
- f" [red]⚠ Issue:[/red] Document counts range from { min_docs :,} to { max_docs :,} ({ ratio :.1f} x difference)"
777
+ f" [red]⚠ Issue:[/red] Document counts range "
778
+ f"from { min_docs :,} to { max_docs :,} ({ ratio :.1f} x difference)"
767
779
)
768
780
769
781
# Show recommendations
@@ -772,7 +784,7 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
772
784
rprint (f" • { rec } " )
773
785
774
786
# Summary statistics
775
- unique_tables = set ( anomaly .table .full_table_name for anomaly in anomalies )
787
+ unique_tables = { anomaly .table .full_table_name for anomaly in anomalies }
776
788
rprint ("\n [dim]📊 Analysis Summary:[/dim]" )
777
789
rprint (f"[dim]• Tables analyzed: { tables_analyzed } [/dim]" )
778
790
rprint (f"[dim]• Tables with issues: { len (unique_tables )} [/dim]" )
0 commit comments