Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Admin: Added XMover - CrateDB shard analyzer and movement tool. Thanks, @WalBeh.

## 2025/08/19 v0.0.41
- I/O: Updated to `influxio-0.6.0`. Thanks, @ZillKhan.
Expand Down
Empty file.
10 changes: 10 additions & 0 deletions cratedb_toolkit/admin/xmover/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
XMover - CrateDB Shard Analyzer and Movement Tool

A tool for analyzing CrateDB shard distribution across nodes and availability zones,
and generating safe SQL commands for shard rebalancing.
"""

__version__ = "0.1.0"
__author__ = "CrateDB Tools"
__description__ = "CrateDB shard analyzer and movement tool"
Empty file.
949 changes: 949 additions & 0 deletions cratedb_toolkit/admin/xmover/analysis/shard.py

Large diffs are not rendered by default.

795 changes: 795 additions & 0 deletions cratedb_toolkit/admin/xmover/analysis/table.py

Large diffs are not rendered by default.

159 changes: 159 additions & 0 deletions cratedb_toolkit/admin/xmover/analysis/zone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from typing import Dict, List, Optional

from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.table import Table

from cratedb_toolkit.admin.xmover.analysis.shard import ShardAnalyzer
from cratedb_toolkit.admin.xmover.model import ShardInfo
from cratedb_toolkit.admin.xmover.util.database import CrateDBClient

console = Console()


class ZoneReport:
def __init__(self, client: CrateDBClient):
self.client = client
self.analyzer = ShardAnalyzer(self.client)

def shard_balance(self, tolerance: float, table: Optional[str] = None):
"""Check zone balance for shards"""
console.print(Panel.fit("[bold blue]Zone Balance Check[/bold blue]"))
console.print("[dim]Note: Analyzing all shards regardless of state for complete cluster view[/dim]")
console.print()

zone_stats = self.analyzer.check_zone_balance(table, tolerance)

Comment on lines +26 to +27
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid double-applying tolerance and fix target rounding bias.

  • You pass tolerance into check_zone_balance(...) and also apply your own tolerance window locally. If ShardAnalyzer.check_zone_balance ever uses that parameter, you’ll be double-counting tolerance. Prefer calling it with named args and compute the window only here.
  • Using integer division for target_per_zone biases status for small totals. Compare against a float expected value instead.

Apply:

-        zone_stats = self.analyzer.check_zone_balance(table, tolerance)
+        zone_stats = self.analyzer.check_zone_balance(table_name=table)
...
-        target_per_zone = total_shards // len(zones) if zones else 0
-        tolerance_range = (target_per_zone * (1 - tolerance / 100), target_per_zone * (1 + tolerance / 100))
+        expected_per_zone = (total_shards / len(zones)) if zones else 0.0
+        tolerance_range = (
+            expected_per_zone * (1 - tolerance / 100.0),
+            expected_per_zone * (1 + tolerance / 100.0),
+        )
...
-            if tolerance_range[0] <= total <= tolerance_range[1]:
+            if tolerance_range[0] <= total <= tolerance_range[1]:
                 status = "[green]✓ Balanced[/green]"
             elif total < tolerance_range[0]:
-                status = f"[yellow]⚠ Under ({total - target_per_zone:+})[/yellow]"
+                status = f"[yellow]⚠ Under ({total - expected_per_zone:+.1f})[/yellow]"
             else:
-                status = f"[red]⚠ Over ({total - target_per_zone:+})[/red]"
+                status = f"[red]⚠ Over ({total - expected_per_zone:+.1f})[/red]"

Also applies to: 32-37, 45-55

🤖 Prompt for AI Agents
In cratedb_toolkit/admin/xmover/analysis/zone.py around lines 26-27 (and
similarly at 32-37 and 45-55), you're double-applying tolerance by passing
tolerance into check_zone_balance and then applying a local tolerance window;
change the call to pass tolerance as a named argument (e.g.,
tolerance=tolerance) and remove any local adjustment that re-applies tolerance
so the window is computed only here. Replace integer division when computing
target_per_zone with a float-based expected value (e.g., total /
float(num_zones) or use decimal) and compare actual counts against that float
expected value (using <=/>= with the tolerance window) to avoid rounding bias
for small totals. Ensure all comparisons use the float expected +/- tolerance
and update the same logic in the other referenced blocks.

if not zone_stats:
console.print("[yellow]No shards found for analysis[/yellow]")
return

# Calculate totals and targets
total_shards = sum(stats["TOTAL"] for stats in zone_stats.values())
zones = list(zone_stats.keys())
target_per_zone = total_shards // len(zones) if zones else 0
tolerance_range = (target_per_zone * (1 - tolerance / 100), target_per_zone * (1 + tolerance / 100))

balance_table = Table(title=f"Zone Balance Analysis (Target: {target_per_zone} ±{tolerance}%)", box=box.ROUNDED)
balance_table.add_column("Zone", style="cyan")
balance_table.add_column("Primary", justify="right", style="blue")
balance_table.add_column("Replica", justify="right", style="green")
balance_table.add_column("Total", justify="right", style="magenta")
balance_table.add_column("Status", style="bold")

for zone, stats in zone_stats.items():
total = stats["TOTAL"]

if tolerance_range[0] <= total <= tolerance_range[1]:
status = "[green]✓ Balanced[/green]"
elif total < tolerance_range[0]:
status = f"[yellow]⚠ Under ({total - target_per_zone:+})[/yellow]"
else:
status = f"[red]⚠ Over ({total - target_per_zone:+})[/red]"

balance_table.add_row(zone, str(stats["PRIMARY"]), str(stats["REPLICA"]), str(total), status)

console.print(balance_table)

def distribution_conflicts(self, shard_details: bool = False, table: Optional[str] = None):
"""Detailed analysis of zone distribution and potential conflicts"""
console.print(Panel.fit("[bold blue]Detailed Zone Analysis[/bold blue]"))
console.print("[dim]Comprehensive zone distribution analysis for CrateDB cluster[/dim]")
console.print()

# Get all shards for analysis
shards = self.client.get_shards_info(table_name=table, for_analysis=True)

if not shards:
console.print("[yellow]No shards found for analysis[/yellow]")
return

# Organize by table and shard
tables: Dict[str, Dict[int, List[ShardInfo]]] = {}
for shard in shards:
table_key = f"{shard.schema_name}.{shard.table_name}"
if table_key not in tables:
tables[table_key] = {}

shard_key = shard.shard_id
if shard_key not in tables[table_key]:
tables[table_key][shard_key] = []

tables[table_key][shard_key].append(shard)

# Analyze each table
zone_conflicts = 0
under_replicated = 0

for table_name, table_shards in tables.items():
console.print(f"\n[bold cyan]Table: {table_name}[/bold cyan]")

# Create analysis table
analysis_table = Table(title=f"Shard Distribution for {table_name}", box=box.ROUNDED)
analysis_table.add_column("Shard ID", justify="right", style="magenta")
analysis_table.add_column("Primary Zone", style="blue")
analysis_table.add_column("Replica Zones", style="green")
analysis_table.add_column("Total Copies", justify="right", style="cyan")
analysis_table.add_column("Status", style="bold")

for shard_id, shard_copies in sorted(table_shards.items()):
primary_zone = "Unknown"
replica_zones = set()
total_copies = len(shard_copies)
zones_with_copies = set()

for shard_copy in shard_copies:
zones_with_copies.add(shard_copy.zone)
if shard_copy.is_primary:
primary_zone = shard_copy.zone
else:
replica_zones.add(shard_copy.zone)

# Determine status
status_parts = []
if len(zones_with_copies) == 1:
zone_conflicts += 1
status_parts.append("[red]⚠ ZONE CONFLICT[/red]")

if total_copies < 2: # Assuming we want at least 1 replica
under_replicated += 1
status_parts.append("[yellow]⚠ Under-replicated[/yellow]")

if not status_parts:
status_parts.append("[green]✓ Good[/green]")

replica_zones_str = ", ".join(sorted(replica_zones)) if replica_zones else "None"

analysis_table.add_row(
str(shard_id), primary_zone, replica_zones_str, str(total_copies), " ".join(status_parts)
)

# Show individual shard details if requested
if shard_details:
for shard_copy in shard_copies:
health_indicator = "✓" if shard_copy.routing_state == "STARTED" else "⚠"
console.print(
f" {health_indicator} {shard_copy.shard_type} "
f"on {shard_copy.node_name} ({shard_copy.zone}) - {shard_copy.routing_state}"
)

console.print(analysis_table)

# Summary
console.print("\n[bold]Zone Analysis Summary:[/bold]")
console.print(f" • Tables analyzed: [cyan]{len(tables)}[/cyan]")
console.print(f" • Zone conflicts detected: [red]{zone_conflicts}[/red]")
console.print(f" • Under-replicated shards: [yellow]{under_replicated}[/yellow]")

if zone_conflicts > 0:
console.print(f"\n[red]⚠ Found {zone_conflicts} zone conflicts that need attention![/red]")
console.print("[dim]Zone conflicts occur when all copies of a shard are in the same zone.[/dim]")
console.print("[dim]This violates CrateDB's zone-awareness and creates availability risks.[/dim]")

if under_replicated > 0:
console.print(f"\n[yellow]⚠ Found {under_replicated} under-replicated shards.[/yellow]")
console.print("[dim]Consider increasing replication for better availability.[/dim]")

if zone_conflicts == 0 and under_replicated == 0:
console.print("\n[green]✓ No critical zone distribution issues detected![/green]")
118 changes: 118 additions & 0 deletions cratedb_toolkit/admin/xmover/attic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# ruff: noqa

# @main.command()
# @click.argument('node_name')
# @click.option('--min-free-space', default=100.0, help='Minimum free space required on target nodes in GB (default: 100)')
# @click.option('--dry-run/--execute', default=True, help='Show decommission plan without generating SQL commands (default: True)')
# @click.pass_context
# def decommission(ctx, node_name: str, min_free_space: float, dry_run: bool):
# """Plan decommissioning of a node by analyzing required shard moves
#
# NODE_NAME: Name of the node to decommission
# """
# client = ctx.obj['client']
# analyzer = ShardAnalyzer(client)
#
# mode_text = "PLANNING MODE" if dry_run else "EXECUTION MODE"
# console.print(Panel.fit(f"[bold blue]Node Decommission Analysis[/bold blue] - [bold {'green' if dry_run else 'red'}]{mode_text}[/bold {'green' if dry_run else 'red'}]"))
# console.print(f"[dim]Analyzing decommission plan for node: {node_name}[/dim]")
# console.print()
#
# # Generate decommission plan
# plan = analyzer.plan_node_decommission(node_name, min_free_space)
#
# if 'error' in plan:
# console.print(f"[red]Error: {plan['error']}[/red]")
# return
#
# # Display plan summary
# summary_table = Table(title=f"Decommission Plan for {node_name}", box=box.ROUNDED)
# summary_table.add_column("Metric", style="cyan")
# summary_table.add_column("Value", style="magenta")
#
# summary_table.add_row("Node", plan['node'])
# summary_table.add_row("Zone", plan['zone'])
# summary_table.add_row("Feasible", "[green]✓ Yes[/green]" if plan['feasible'] else "[red]✗ No[/red]")
# summary_table.add_row("Shards to Move", str(plan['shards_to_move']))
# summary_table.add_row("Moveable Shards", str(plan['moveable_shards']))
# summary_table.add_row("Total Data Size", format_size(plan['total_size_gb']))
# summary_table.add_row("Estimated Time", f"{plan['estimated_time_hours']:.1f} hours")
#
# console.print(summary_table)
# console.print()
#
# # Show warnings if any
# if plan['warnings']:
# console.print("[bold yellow]⚠ Warnings:[/bold yellow]")
# for warning in plan['warnings']:
# console.print(f" • [yellow]{warning}[/yellow]")
# console.print()
#
# # Show infeasible moves if any
# if plan['infeasible_moves']:
# console.print("[bold red]✗ Cannot Move:[/bold red]")
# infeasible_table = Table(box=box.ROUNDED)
# infeasible_table.add_column("Shard", style="cyan")
# infeasible_table.add_column("Size", style="magenta")
# infeasible_table.add_column("Reason", style="red")
#
# for move in plan['infeasible_moves']:
# infeasible_table.add_row(
# move['shard'],
# format_size(move['size_gb']),
# move['reason']
# )
# console.print(infeasible_table)
# console.print()
#
# # Show move recommendations
# if plan['recommendations']:
# move_table = Table(title="Required Shard Moves", box=box.ROUNDED)
# move_table.add_column("Table", style="cyan")
# move_table.add_column("Shard", justify="right", style="magenta")
# move_table.add_column("Type", style="blue")
# move_table.add_column("Size", style="green")
# move_table.add_column("From Zone", style="yellow")
# move_table.add_column("To Node", style="cyan")
# move_table.add_column("To Zone", style="yellow")
#
# for rec in plan['recommendations']:
# move_table.add_row(
# f"{rec.schema_name}.{rec.table_name}",
# str(rec.shard_id),
# rec.shard_type,
# format_size(rec.size_gb),
# rec.from_zone,
# rec.to_node,
# rec.to_zone
# )
#
# console.print(move_table)
# console.print()
#
# # Generate SQL commands if not in dry-run mode
# if not dry_run and plan['feasible']:
# console.print(Panel.fit("[bold green]Decommission SQL Commands[/bold green]"))
# console.print("[dim]# Execute these commands in order to prepare for node decommission[/dim]")
# console.print("[dim]# ALWAYS test in a non-production environment first![/dim]")
# console.print("[dim]# Monitor shard health after each move before proceeding[/dim]")
# console.print()
#
# for i, rec in enumerate(plan['recommendations'], 1):
# console.print(f"-- Move {i}: {rec.reason}")
# console.print(f"{rec.to_sql()}")
# console.print()
#
# console.print(f"-- After all moves complete, the node {node_name} can be safely removed")
# console.print(f"-- Total moves required: {len(plan['recommendations'])}")
# elif dry_run:
# console.print("[green]✓ Decommission plan ready. Use --execute to generate SQL commands.[/green]")
#
# # Final status
# if not plan['feasible']:
# console.print(f"[red]⚠ Node {node_name} cannot be safely decommissioned at this time.[/red]")
# console.print("[dim]Address the issues above before attempting decommission.[/dim]")
# elif plan['shards_to_move'] == 0:
# console.print(f"[green]✓ Node {node_name} is ready for immediate decommission (no shards to move).[/green]")
# else:
# console.print(f"[green]✓ Node {node_name} can be safely decommissioned after moving {len(plan['recommendations'])} shards.[/green]")
Loading
Loading