import requests import sqlite3 import time import json from typing import Optional, List, Dict class P2PKBlockchainScanner: def __init__(self, db_path: str = "bitcoin_p2pk.db"): """Inizializza lo scanner con connessione al database""" self.db_path = db_path self.api_base = "https://mempool.space/api" self.init_database() def init_database(self): """Crea il database SQLite per memorizzare i P2PK""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS p2pk_addresses ( id INTEGER PRIMARY KEY AUTOINCREMENT, block_height INTEGER NOT NULL, txid TEXT NOT NULL, output_index INTEGER NOT NULL, scriptpubkey TEXT NOT NULL, value_satoshi INTEGER, timestamp INTEGER, is_unspent INTEGER DEFAULT 0, last_checked INTEGER DEFAULT 0, UNIQUE(txid, output_index) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS scan_progress ( id INTEGER PRIMARY KEY CHECK (id = 1), last_scanned_block INTEGER DEFAULT 0, total_p2pk_found INTEGER DEFAULT 0 ) ''') # Inizializza il progresso se non esiste cursor.execute('INSERT OR IGNORE INTO scan_progress (id, last_scanned_block) VALUES (1, 0)') conn.commit() conn.close() def get_block_hash(self, height: int) -> Optional[str]: """Ottieni l'hash del blocco data l'altezza""" try: response = requests.get(f"{self.api_base}/block-height/{height}", timeout=10) if response.status_code == 200: return response.text.strip() except Exception as e: print(f"Errore ottenendo hash per blocco {height}: {e}") return None def get_block_transactions(self, block_hash: str) -> Optional[List[Dict]]: """Ottieni le transazioni di un blocco con paginazione""" try: all_transactions = [] start_index = 0 # L'API restituisce massimo 25 transazioni per volta while True: url = f"{self.api_base}/block/{block_hash}/txs/{start_index}" response = requests.get(url, timeout=30) if response.status_code != 200: break txs = response.json() if not txs or not isinstance(txs, list) or len(txs) == 0: break all_transactions.extend(txs) # Se riceviamo meno di 25 transazioni, siamo alla fine if len(txs) < 25: break start_index += 25 time.sleep(0.1) # Piccola pausa tra le richieste paginate return all_transactions if all_transactions else None except Exception as e: print(f" ⚠️ Errore ottenendo transazioni per blocco {block_hash}: {e}") return None def extract_p2pk_from_transaction(self, tx: Dict) -> List[Dict]: """Estrai output P2PK da una transazione""" p2pk_outputs = [] for idx, output in enumerate(tx.get('vout', [])): scriptpubkey_type = output.get('scriptpubkey_type', '') scriptpubkey = output.get('scriptpubkey', '') scriptpubkey_asm = output.get('scriptpubkey_asm', '') # Metodo 1: Controlla il tipo esplicito is_p2pk_type = scriptpubkey_type in ['pubkey', 'p2pk'] # Metodo 2: Controlla la lunghezza dello script (P2PK = 67 o 35 byte) # 67 byte = chiave pubblica non compressa (04 + 64 hex chars) + OP_CHECKSIG # 35 byte = chiave pubblica compressa (02/03 + 32 hex chars) + OP_CHECKSIG script_len = len(scriptpubkey) // 2 if scriptpubkey else 0 is_p2pk_length = script_len in [67, 35] # Metodo 3: Controlla il pattern ASM (PUBLIC_KEY OP_CHECKSIG) is_p2pk_asm = False if scriptpubkey_asm: parts = scriptpubkey_asm.split() # P2PK ha esattamente 2 elementi: chiave pubblica + OP_CHECKSIG if len(parts) == 2 and parts[1] in ['OP_CHECKSIG', 'CHECKSIG']: # Verifica che il primo elemento sia una chiave pubblica valida pubkey = parts[0] # Chiave non compressa: 130 caratteri (65 byte) # Chiave compressa: 66 caratteri (33 byte) if len(pubkey) in [130, 66]: is_p2pk_asm = True # Metodo 4: Controlla il pattern hex dello scriptPubKey is_p2pk_hex = False if scriptpubkey: # P2PK non compresso: 41 (65 byte pubkey) + pubkey + ac (OP_CHECKSIG) # P2PK compresso: 21 (33 byte pubkey) + pubkey + ac (OP_CHECKSIG) if scriptpubkey.endswith('ac'): # OP_CHECKSIG if scriptpubkey.startswith('41') and len(scriptpubkey) == 134: # 67 byte * 2 is_p2pk_hex = True elif scriptpubkey.startswith('21') and len(scriptpubkey) == 70: # 35 byte * 2 is_p2pk_hex = True # Se uno dei metodi identifica P2PK if is_p2pk_type or is_p2pk_length or is_p2pk_asm or is_p2pk_hex: p2pk_data = { 'txid': tx['txid'], 'output_index': idx, # Usa l'indice dell'array 'scriptpubkey': scriptpubkey, 'value': output.get('value', 0), 'asm': scriptpubkey_asm } p2pk_outputs.append(p2pk_data) return p2pk_outputs def save_p2pk_data(self, block_height: int, p2pk_data: List[Dict], block_timestamp: int): """Salva i dati P2PK nel database e verifica lo stato UTXO""" if not p2pk_data: return conn = sqlite3.connect(self.db_path) cursor = conn.cursor() for data in p2pk_data: try: # Verifica se l'UTXO è ancora non speso utxo_status = self.check_utxo_status(data['txid'], data['output_index']) is_unspent = 1 if utxo_status['is_unspent'] else 0 current_time = int(time.time()) cursor.execute(''' INSERT OR REPLACE INTO p2pk_addresses (block_height, txid, output_index, scriptpubkey, value_satoshi, timestamp, is_unspent, last_checked) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( block_height, data['txid'], data['output_index'], data['scriptpubkey'], data['value'], # Il valore è già in satoshi dall'API block_timestamp, is_unspent, current_time )) # Log dello stato if is_unspent: print(f" 💰 UTXO NON SPESO! Valore: {data['value']} sat") except Exception as e: print(f"Errore salvataggio dato: {e}") # Aggiorna il progresso cursor.execute(''' UPDATE scan_progress SET last_scanned_block = ?, total_p2pk_found = total_p2pk_found + ? WHERE id = 1 ''', (block_height, len(p2pk_data))) conn.commit() conn.close() def get_block_timestamp(self, block_hash: str) -> Optional[int]: """Ottieni timestamp del blocco""" try: response = requests.get(f"{self.api_base}/block/{block_hash}", timeout=10) if response.status_code == 200: block_data = response.json() return block_data.get('timestamp', 0) except Exception: pass return None def check_utxo_status(self, txid: str, vout: int) -> Dict: """Verifica se l'UTXO è ancora non speso (ha saldo)""" try: response = requests.get(f"{self.api_base}/tx/{txid}/outspend/{vout}", timeout=10) if response.status_code == 200: data = response.json() # Se 'spent' è False, l'UTXO non è stato speso is_unspent = not data.get('spent', True) return { 'is_unspent': is_unspent, 'spent_txid': data.get('txid') if data.get('spent') else None, 'spent_vin': data.get('vin') if data.get('spent') else None } except Exception as e: print(f" ⚠️ Errore verifica UTXO {txid}:{vout} - {e}") return {'is_unspent': False, 'spent_txid': None, 'spent_vin': None} def get_last_scanned_block(self) -> int: """Ottieni l'ultimo blocco scannerizzato""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT last_scanned_block FROM scan_progress WHERE id = 1') result = cursor.fetchone() conn.close() return result[0] if result else 0 def scan_blocks(self, start_block: int, end_block: int, delay: float = 0.5): """Scannerizza blocchi in un range""" print(f"🚀 Inizio scansione blocchi {start_block} a {end_block}") print(f"Database: {self.db_path}") for height in range(start_block, end_block + 1): try: print(f"\n📦 Analisi blocco {height}...") # Ottieni hash del blocco block_hash = self.get_block_hash(height) if not block_hash: print(f" ⚠️ Hash non trovato per blocco {height}") continue # Ottieni timestamp timestamp = self.get_block_timestamp(block_hash) or 0 # Ottieni transazioni transactions = self.get_block_transactions(block_hash) if transactions is None: print(f" ⚠️ Transazioni non trovate per blocco {height}") continue # Processa transazioni total_p2pk_in_block = 0 all_p2pk_in_block = [] for tx in transactions: p2pk_outputs = self.extract_p2pk_from_transaction(tx) if p2pk_outputs: total_p2pk_in_block += len(p2pk_outputs) all_p2pk_in_block.extend(p2pk_outputs) for p2pk in p2pk_outputs: print(f" ✅ P2PK in TX {tx['txid'][:16]}... | Valore: {p2pk['value']} sat | Script: {p2pk['scriptpubkey'][:20]}...") # Salva tutti i P2PK trovati nel blocco if all_p2pk_in_block: self.save_p2pk_data(height, all_p2pk_in_block, timestamp) print(f" 📊 Blocco {height}: {total_p2pk_in_block} P2PK trovati e salvati") else: # Aggiorna comunque il progresso anche se non ci sono P2PK conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('UPDATE scan_progress SET last_scanned_block = ? WHERE id = 1', (height,)) conn.commit() conn.close() print(f" 🔍 Blocco {height}: Nessun P2PK trovato") # Aggiorna progresso ogni 10 blocchi if height % 10 == 0: self.print_progress() # Rispetta l'API (rate limiting) time.sleep(delay) except KeyboardInterrupt: print("\n⏸️ Scansione interrotta dall'utente") break except Exception as e: print(f"❌ Errore durante scansione blocco {height}: {e}") time.sleep(2) # Pausa più lunga in caso di errore self.print_final_report() def print_progress(self): """Stampa il progresso attuale""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT last_scanned_block, total_p2pk_found FROM scan_progress WHERE id = 1') progress = cursor.fetchone() cursor.execute('SELECT COUNT(DISTINCT block_height) FROM p2pk_addresses') blocks_with_p2pk = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM p2pk_addresses') total_p2pk = cursor.fetchone()[0] conn.close() print(f"\n{'='*50}") print(f"📈 PROGRESSO SCANSIONE") print(f"📦 Ultimo blocco: {progress[0]}") print(f"🔑 P2PK totali: {progress[1]}") print(f"📊 Blocchi con P2PK: {blocks_with_p2pk}") print(f"📝 Indirizzi unici: {total_p2pk}") print(f"{'='*50}\n") def print_final_report(self): """Stampa report finale""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Statistiche finali cursor.execute('SELECT MAX(block_height), MIN(block_height) FROM p2pk_addresses') max_block, min_block = cursor.fetchone() cursor.execute('SELECT SUM(value_satoshi) FROM p2pk_addresses') total_value = cursor.fetchone()[0] or 0 cursor.execute('SELECT COUNT(*) FROM p2pk_addresses') total_addresses = cursor.fetchone()[0] print(f"\n{'='*60}") print(f"🎉 SCANSIONE COMPLETATA") print(f"{'='*60}") print(f"📊 Totale indirizzi P2PK: {total_addresses}") print(f"📦 Primo blocco con P2PK: {min_block or 'Nessuno'}") print(f"📦 Ultimo blocco con P2PK: {max_block or 'Nessuno'}") print(f"💰 Valore totale: {total_value / 100000000:.8f} BTC") print(f"💾 Database: {self.db_path}") print(f"{'='*60}") # Esempio query per estrarre chiavi pubbliche if total_addresses > 0: print(f"\n📋 Esempio dati raccolti:") cursor.execute('SELECT scriptpubkey FROM p2pk_addresses LIMIT 3') samples = cursor.fetchall() for i, sample in enumerate(samples): print(f" {i+1}. {sample[0][:50]}...") conn.close() def export_to_csv(self, output_file: str = "p2pk_data.csv"): """Esporta dati in CSV per analisi""" import csv conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT block_height, txid, output_index, scriptpubkey, value_satoshi, timestamp FROM p2pk_addresses ORDER BY block_height ''') with open(output_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['Block', 'TXID', 'Output Index', 'ScriptPubKey', 'Value (satoshi)', 'Timestamp']) for row in cursor.fetchall(): writer.writerow(row) conn.close() print(f"📁 Dati esportati in: {output_file}") # USO DIDATTICO if __name__ == "__main__": # AVVERTENZA IMPORTANTE print("⚠️ AVVISO: Questo script è SOLO per SCOPI EDUCATIVI") print("⛔ NON usare per attività illegali") print("📚 Scopo: Studio della struttura blockchain Bitcoin\n") # Crea scanner scanner = P2PKBlockchainScanner("bitcoin_p2pk_study.db") # Recupera l'ultimo blocco scannerizzato last_scanned = scanner.get_last_scanned_block() print(f"📊 Ultimo blocco scannerizzato: {last_scanned}") print(f"💡 I primi blocchi di Bitcoin (1-10000) contengono molti P2PK") # Chiedi all'utente il blocco di inizio start_input = input(f"\n📍 Blocco di inizio scansione (default: {last_scanned + 1 if last_scanned > 0 else 1}): ").strip() if start_input: try: START_BLOCK = int(start_input) if START_BLOCK < 1: print("⚠️ Il blocco deve essere >= 1, uso blocco 1") START_BLOCK = 1 except ValueError: print(f"⚠️ Valore non valido, uso blocco {last_scanned + 1 if last_scanned > 0 else 1}") START_BLOCK = last_scanned + 1 if last_scanned > 0 else 1 else: START_BLOCK = last_scanned + 1 if last_scanned > 0 else 1 # Chiedi all'utente il blocco finale end_input = input(f"📍 Blocco finale scansione (default: {START_BLOCK + 999}): ").strip() if end_input: try: END_BLOCK = int(end_input) if END_BLOCK < START_BLOCK: print(f"⚠️ Il blocco finale deve essere >= {START_BLOCK}, uso {START_BLOCK + 999}") END_BLOCK = START_BLOCK + 999 except ValueError: print(f"⚠️ Valore non valido, uso blocco {START_BLOCK + 999}") END_BLOCK = START_BLOCK + 999 else: END_BLOCK = START_BLOCK + 999 # Chiedi il delay tra le richieste delay_input = input("⏱️ Delay tra richieste in secondi (default: 1.0): ").strip() if delay_input: try: REQUEST_DELAY = float(delay_input) if REQUEST_DELAY < 0.1: print("⚠️ Il delay minimo è 0.1s per rispettare l'API") REQUEST_DELAY = 0.1 except ValueError: print("⚠️ Valore non valido, uso 1.0s") REQUEST_DELAY = 1.0 else: REQUEST_DELAY = 1.0 print(f"\n🔧 Configurazione:") print(f" Primo blocco: {START_BLOCK}") print(f" Ultimo blocco: {END_BLOCK}") print(f" Totale blocchi: {END_BLOCK - START_BLOCK + 1}") print(f" Delay richieste: {REQUEST_DELAY}s") print(f" Tempo stimato: ~{((END_BLOCK - START_BLOCK + 1) * REQUEST_DELAY) / 60:.1f} minuti") # Conferma response = input("\n▶️ Avviare la scansione? (s/n): ").strip().lower() if response == 's': scanner.scan_blocks(START_BLOCK, END_BLOCK, REQUEST_DELAY) # Esporta in CSV export = input("\n📤 Esportare dati in CSV? (s/n): ").strip().lower() if export == 's': scanner.export_to_csv() else: print("⏹️ Scansione annullata") print("\n💡 Per analisi ulteriori:") print(" 1. Usa SQLite per query: SELECT * FROM p2pk_addresses LIMIT 10") print(" 2. Analizza gli script P2PK con librerie Bitcoin") print(" 3. Studia la struttura delle transazioni early Bitcoin")