-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbftron107.py
344 lines (283 loc) · 13 KB
/
bftron107.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import os
import ecdsa
import hashlib
import base58
from tronpy import Tron
from colorama import init, Fore
import math
import multiprocessing
from multiprocessing import Pool, Manager
from tqdm import tqdm
import logging
import json
import time
from pathlib import Path
# Initialize Tron client
client = Tron()
# Initialize colorama and logging
init(autoreset=True)
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[
logging.FileHandler('bftron.log'),
logging.StreamHandler()
]
)
# Sample private key with desired characteristics
SAMPLE_PRIVATE_KEY = bytes.fromhex('597fb3998b6470ed35dcd5074e5bd3a2d9f15ef4bb56c1639dceb0626476045e')
# Target metrics with wider tolerances
TARGET_ENTROPY = 4.88
TARGET_BIT_DIVERSITY = 53.52
ENTROPY_TOLERANCE = 4.0
BIT_DIVERSITY_TOLERANCE = 4.0
# State management
STATE_FILE = 'bftron_state.json'
CHECKPOINT_INTERVAL = 10 # Save state every 10 addresses
class StateManager:
def __init__(self, state_file):
self.state_file = state_file
self.state = self.load_state()
def load_state(self):
if Path(self.state_file).exists():
try:
with open(self.state_file, 'r') as f:
return json.load(f)
except:
return self.get_initial_state()
return self.get_initial_state()
def get_initial_state(self):
return {
'last_address_index': 0,
'processed_addresses': [],
'total_attempts': 0,
'last_update': time.time()
}
def save_state(self):
self.state['last_update'] = time.time()
with open(self.state_file, 'w') as f:
json.dump(self.state, f)
def update_progress(self, address_index, address):
self.state['last_address_index'] = address_index
if address not in self.state['processed_addresses']:
self.state['processed_addresses'].append(address)
self.state['total_attempts'] += 1
if address_index % CHECKPOINT_INTERVAL == 0:
self.save_state()
def analyze_sample_key():
entropy = calculate_entropy(SAMPLE_PRIVATE_KEY)
bit_diversity = calculate_bit_diversity(SAMPLE_PRIVATE_KEY)
byte_pattern = get_byte_pattern(SAMPLE_PRIVATE_KEY)
return entropy, bit_diversity, byte_pattern
def get_byte_pattern(data):
"""Analyze byte patterns in the private key"""
patterns = {
'high_bytes': sum(1 for b in data if b > 127),
'low_bytes': sum(1 for b in data if b < 128),
'zero_bytes': sum(1 for b in data if b == 0),
'byte_ranges': [sum(1 for b in data if lower <= b < upper)
for lower, upper in [(0,64), (64,128), (128,192), (192,256)]]
}
return patterns
def generate_private_key():
while True:
key = os.urandom(32)
entropy = calculate_entropy(key)
bit_diversity = calculate_bit_diversity(key)
if (abs(entropy - TARGET_ENTROPY) <= ENTROPY_TOLERANCE and
abs(bit_diversity - TARGET_BIT_DIVERSITY) <= BIT_DIVERSITY_TOLERANCE):
key_pattern = get_byte_pattern(key)
sample_pattern = get_byte_pattern(SAMPLE_PRIVATE_KEY)
pattern_match = (
abs(key_pattern['high_bytes'] - sample_pattern['high_bytes']) <= 4 and
abs(key_pattern['low_bytes'] - sample_pattern['low_bytes']) <= 4 and
all(abs(k - s) <= 4 for k, s in zip(key_pattern['byte_ranges'],
sample_pattern['byte_ranges']))
)
if pattern_match:
return key
def private_key_to_public_key(private_key):
sk = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.SECP256k1)
return sk.get_verifying_key().to_string()
def public_key_to_address(public_key):
sha256 = hashlib.sha256(public_key).digest()
ripemd160 = hashlib.new('ripemd160', sha256).digest()
address = b'\x41' + ripemd160
checksum = hashlib.sha256(hashlib.sha256(address).digest()).digest()[:4]
return base58.b58encode(address + checksum)
def check_address_balance(address):
try:
balance = client.get_account_balance(address)
return balance
except Exception as e:
logging.error(f"Error checking balance: {e}")
return None
def calculate_entropy(data):
byte_counts = {}
for byte in data:
if byte in byte_counts:
byte_counts[byte] += 1
else:
byte_counts[byte] = 1
entropy = 0
for count in byte_counts.values():
probability = count / len(data)
entropy -= probability * math.log2(probability)
return entropy
def calculate_bit_diversity(data):
bits = ''.join(format(byte, '08b') for byte in data)
total_bits = len(bits)
unique_bits = len(set(bits))
return (unique_bits / total_bits) * 100
def process_chunk(args):
try:
target_address, start_idx, chunk_size, result_queue, progress_queue = args
local_progress = 0
progress_update_interval = 1000 # Update progress every 1000 attempts
for i in range(chunk_size):
try:
# Generate random private key
private_key = os.urandom(32)
entropy = calculate_entropy(private_key)
# Only proceed if entropy is within range
if abs(entropy - TARGET_ENTROPY) <= ENTROPY_TOLERANCE:
bit_diversity = calculate_bit_diversity(private_key)
if abs(bit_diversity - TARGET_BIT_DIVERSITY) <= BIT_DIVERSITY_TOLERANCE:
try:
# Generate public key and address
public_key = private_key_to_public_key(private_key)
generated_address = public_key_to_address(public_key).decode()
# Check if we found a match
if generated_address == target_address:
try:
balance = check_address_balance(target_address)
result = {
'address': target_address,
'private_key': private_key.hex(),
'balance': balance,
'entropy': entropy,
'bit_diversity': bit_diversity
}
result_queue.put(result)
return result
except Exception as e:
logging.error(f"Error checking balance: {e}")
# Log interesting matches
if abs(entropy - TARGET_ENTROPY) <= ENTROPY_TOLERANCE/2:
logging.info(f'\nClose match: {private_key.hex()}')
logging.info(f'Entropy: {entropy:.2f}, Diversity: {bit_diversity:.2f}%')
except Exception as e:
logging.error(f"Error in key conversion: {e}")
continue
# Update progress
local_progress += 1
if local_progress % progress_update_interval == 0:
progress_queue.put(progress_update_interval)
local_progress = 0
except Exception as e:
logging.error(f"Error in attempt {i}: {e}")
continue
# Send remaining progress
if local_progress > 0:
progress_queue.put(local_progress)
except Exception as e:
logging.error(f"Critical error in process_chunk: {e}")
return None
def process_address(address, attempts_per_address, num_processes):
chunk_size = attempts_per_address // num_processes
manager = Manager()
result_queue = manager.Queue()
progress_queue = manager.Queue()
chunks = [(address, i * chunk_size, chunk_size, result_queue, progress_queue)
for i in range(num_processes)]
with Pool(processes=num_processes) as pool:
total_progress = 0
with tqdm(total=attempts_per_address, desc=f"Processing {address[:8]}...") as pbar:
async_results = [pool.apply_async(process_chunk, (chunk,)) for chunk in chunks]
while True:
try:
# Update progress bar
while not progress_queue.empty():
progress = progress_queue.get_nowait()
pbar.update(progress)
total_progress += progress
# Check if any process found a match
if not result_queue.empty():
result = result_queue.get_nowait()
if result:
return result
# Check if all processes are done
if all(r.ready() for r in async_results):
results = [r.get() for r in async_results]
if any(results):
return next(r for r in results if r)
break
time.sleep(0.1) # Prevent CPU overload
except Exception as e:
logging.error(f"Error in progress tracking: {e}")
continue
return None
def main(input_file, output_file):
state_manager = StateManager(STATE_FILE)
sample_entropy, sample_bit_diversity, sample_pattern = analyze_sample_key()
logging.info(f"\nSample Key Analysis:")
logging.info(f"Sample Key: {SAMPLE_PRIVATE_KEY.hex()}")
logging.info(f"Entropy: {sample_entropy:.2f} bits")
logging.info(f"Bit Diversity: {sample_bit_diversity:.2f}%")
logging.info(f"Byte Pattern: {sample_pattern}\n")
logging.info(f"Target Metrics:")
logging.info(f"Entropy: {TARGET_ENTROPY} ± {ENTROPY_TOLERANCE} bits")
logging.info(f"Bit Diversity: {TARGET_BIT_DIVERSITY} ± {BIT_DIVERSITY_TOLERANCE}%\n")
with open(input_file, 'r') as file:
all_addresses = [addr.strip() for addr in file.readlines() if addr.strip()]
# Resume from last position
start_index = state_manager.state['last_address_index']
addresses = all_addresses[start_index:]
total_addresses = len(addresses)
logging.info(f"Resuming from address {start_index + 1}")
logging.info(f"Remaining addresses to process: {total_addresses}\n")
num_processes = 4
attempts_per_address = 1000
total_matches = 0
logging.info(f"Using {num_processes} CPU cores for parallel processing")
output_mode = 'a' if start_index > 0 else 'w'
with open(output_file, output_mode) as out_file:
try:
for index, address in enumerate(addresses, start=start_index + 1):
logging.info(f"\nProcessing address {index}/{len(all_addresses)}: {address}")
result = process_address(address, attempts_per_address, num_processes)
state_manager.update_progress(index, address)
if result:
total_matches += 1
logging.info(
f'Match found! Address: {result["address"]}, '
f'Private Key: {result["private_key"]}, '
f'Balance: {result["balance"]}'
)
logging.info(f'Entropy: {result["entropy"]:.2f} bits')
logging.info(f'Bit Diversity: {result["bit_diversity"]:.2f}%')
out_file.write(
f'Address: {result["address"]}, '
f'Private Key: {result["private_key"]}, '
f'Balance: {result["balance"]}, '
f'Entropy: {result["entropy"]:.2f}, '
f'Bit Diversity: {result["bit_diversity"]:.2f}\n'
)
out_file.flush() # Ensure immediate write
else:
logging.info(f'No match found for address: {address}')
except KeyboardInterrupt:
logging.info("\nProcess interrupted by user. Progress saved.")
state_manager.save_state()
return
except Exception as e:
logging.error(f"\nError occurred: {e}")
state_manager.save_state()
raise
state_manager.save_state()
logging.info(f"\nProcessing complete. Total matches found: {total_matches}")
logging.info(f"Results saved to {output_file}")
if __name__ == "__main__":
input_file = 'tron.txt'
output_file = 'found'
main(input_file, output_file)