Files
p2pk-bf/databases/scan_blockchain.py
2026-01-23 07:57:53 +01:00

466 lines
19 KiB
Python

import requests
import sqlite3
import time
import json
from typing import Optional, List, Dict
class P2PKBlockchainScanner:
def __init__(self, db_path: str = "bitcoin_p2pk.db"):
"""Inizializza lo scanner con connessione al database"""
self.db_path = db_path
self.api_base = "https://mempool.space/api"
self.init_database()
def init_database(self):
"""Crea il database SQLite per memorizzare i P2PK"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS p2pk_addresses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
block_height INTEGER NOT NULL,
txid TEXT NOT NULL,
output_index INTEGER NOT NULL,
scriptpubkey TEXT NOT NULL,
value_satoshi INTEGER,
timestamp INTEGER,
is_unspent INTEGER DEFAULT 0,
last_checked INTEGER DEFAULT 0,
UNIQUE(txid, output_index)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS scan_progress (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_scanned_block INTEGER DEFAULT 0,
total_p2pk_found INTEGER DEFAULT 0
)
''')
# Inizializza il progresso se non esiste
cursor.execute('INSERT OR IGNORE INTO scan_progress (id, last_scanned_block) VALUES (1, 0)')
conn.commit()
conn.close()
def get_block_hash(self, height: int) -> Optional[str]:
"""Ottieni l'hash del blocco data l'altezza"""
try:
response = requests.get(f"{self.api_base}/block-height/{height}", timeout=10)
if response.status_code == 200:
return response.text.strip()
except Exception as e:
print(f"Errore ottenendo hash per blocco {height}: {e}")
return None
def get_block_transactions(self, block_hash: str) -> Optional[List[Dict]]:
"""Ottieni le transazioni di un blocco con paginazione"""
try:
all_transactions = []
start_index = 0
# L'API restituisce massimo 25 transazioni per volta
while True:
url = f"{self.api_base}/block/{block_hash}/txs/{start_index}"
response = requests.get(url, timeout=30)
if response.status_code != 200:
break
txs = response.json()
if not txs or not isinstance(txs, list) or len(txs) == 0:
break
all_transactions.extend(txs)
# Se riceviamo meno di 25 transazioni, siamo alla fine
if len(txs) < 25:
break
start_index += 25
time.sleep(0.1) # Piccola pausa tra le richieste paginate
return all_transactions if all_transactions else None
except Exception as e:
print(f" ⚠️ Errore ottenendo transazioni per blocco {block_hash}: {e}")
return None
def extract_p2pk_from_transaction(self, tx: Dict) -> List[Dict]:
"""Estrai output P2PK da una transazione"""
p2pk_outputs = []
for idx, output in enumerate(tx.get('vout', [])):
scriptpubkey_type = output.get('scriptpubkey_type', '')
scriptpubkey = output.get('scriptpubkey', '')
scriptpubkey_asm = output.get('scriptpubkey_asm', '')
# Metodo 1: Controlla il tipo esplicito
is_p2pk_type = scriptpubkey_type in ['pubkey', 'p2pk']
# Metodo 2: Controlla la lunghezza dello script (P2PK = 67 o 35 byte)
# 67 byte = chiave pubblica non compressa (04 + 64 hex chars) + OP_CHECKSIG
# 35 byte = chiave pubblica compressa (02/03 + 32 hex chars) + OP_CHECKSIG
script_len = len(scriptpubkey) // 2 if scriptpubkey else 0
is_p2pk_length = script_len in [67, 35]
# Metodo 3: Controlla il pattern ASM (PUBLIC_KEY OP_CHECKSIG)
is_p2pk_asm = False
if scriptpubkey_asm:
parts = scriptpubkey_asm.split()
# P2PK ha esattamente 2 elementi: chiave pubblica + OP_CHECKSIG
if len(parts) == 2 and parts[1] in ['OP_CHECKSIG', 'CHECKSIG']:
# Verifica che il primo elemento sia una chiave pubblica valida
pubkey = parts[0]
# Chiave non compressa: 130 caratteri (65 byte)
# Chiave compressa: 66 caratteri (33 byte)
if len(pubkey) in [130, 66]:
is_p2pk_asm = True
# Metodo 4: Controlla il pattern hex dello scriptPubKey
is_p2pk_hex = False
if scriptpubkey:
# P2PK non compresso: 41 (65 byte pubkey) + pubkey + ac (OP_CHECKSIG)
# P2PK compresso: 21 (33 byte pubkey) + pubkey + ac (OP_CHECKSIG)
if scriptpubkey.endswith('ac'): # OP_CHECKSIG
if scriptpubkey.startswith('41') and len(scriptpubkey) == 134: # 67 byte * 2
is_p2pk_hex = True
elif scriptpubkey.startswith('21') and len(scriptpubkey) == 70: # 35 byte * 2
is_p2pk_hex = True
# Se uno dei metodi identifica P2PK
if is_p2pk_type or is_p2pk_length or is_p2pk_asm or is_p2pk_hex:
p2pk_data = {
'txid': tx['txid'],
'output_index': idx, # Usa l'indice dell'array
'scriptpubkey': scriptpubkey,
'value': output.get('value', 0),
'asm': scriptpubkey_asm
}
p2pk_outputs.append(p2pk_data)
return p2pk_outputs
def save_p2pk_data(self, block_height: int, p2pk_data: List[Dict], block_timestamp: int):
"""Salva i dati P2PK nel database e verifica lo stato UTXO"""
if not p2pk_data:
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for data in p2pk_data:
try:
# Verifica se l'UTXO è ancora non speso
utxo_status = self.check_utxo_status(data['txid'], data['output_index'])
is_unspent = 1 if utxo_status['is_unspent'] else 0
current_time = int(time.time())
cursor.execute('''
INSERT OR REPLACE INTO p2pk_addresses
(block_height, txid, output_index, scriptpubkey, value_satoshi, timestamp, is_unspent, last_checked)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
block_height,
data['txid'],
data['output_index'],
data['scriptpubkey'],
data['value'], # Il valore è già in satoshi dall'API
block_timestamp,
is_unspent,
current_time
))
# Log dello stato
if is_unspent:
print(f" 💰 UTXO NON SPESO! Valore: {data['value']} sat")
except Exception as e:
print(f"Errore salvataggio dato: {e}")
# Aggiorna il progresso
cursor.execute('''
UPDATE scan_progress
SET last_scanned_block = ?,
total_p2pk_found = total_p2pk_found + ?
WHERE id = 1
''', (block_height, len(p2pk_data)))
conn.commit()
conn.close()
def get_block_timestamp(self, block_hash: str) -> Optional[int]:
"""Ottieni timestamp del blocco"""
try:
response = requests.get(f"{self.api_base}/block/{block_hash}", timeout=10)
if response.status_code == 200:
block_data = response.json()
return block_data.get('timestamp', 0)
except Exception:
pass
return None
def check_utxo_status(self, txid: str, vout: int) -> Dict:
"""Verifica se l'UTXO è ancora non speso (ha saldo)"""
try:
response = requests.get(f"{self.api_base}/tx/{txid}/outspend/{vout}", timeout=10)
if response.status_code == 200:
data = response.json()
# Se 'spent' è False, l'UTXO non è stato speso
is_unspent = not data.get('spent', True)
return {
'is_unspent': is_unspent,
'spent_txid': data.get('txid') if data.get('spent') else None,
'spent_vin': data.get('vin') if data.get('spent') else None
}
except Exception as e:
print(f" ⚠️ Errore verifica UTXO {txid}:{vout} - {e}")
return {'is_unspent': False, 'spent_txid': None, 'spent_vin': None}
def get_last_scanned_block(self) -> int:
"""Ottieni l'ultimo blocco scannerizzato"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT last_scanned_block FROM scan_progress WHERE id = 1')
result = cursor.fetchone()
conn.close()
return result[0] if result else 0
def scan_blocks(self, start_block: int, end_block: int, delay: float = 0.5):
"""Scannerizza blocchi in un range"""
print(f"🚀 Inizio scansione blocchi {start_block} a {end_block}")
print(f"Database: {self.db_path}")
for height in range(start_block, end_block + 1):
try:
print(f"\n📦 Analisi blocco {height}...")
# Ottieni hash del blocco
block_hash = self.get_block_hash(height)
if not block_hash:
print(f" ⚠️ Hash non trovato per blocco {height}")
continue
# Ottieni timestamp
timestamp = self.get_block_timestamp(block_hash) or 0
# Ottieni transazioni
transactions = self.get_block_transactions(block_hash)
if transactions is None:
print(f" ⚠️ Transazioni non trovate per blocco {height}")
continue
# Processa transazioni
total_p2pk_in_block = 0
all_p2pk_in_block = []
for tx in transactions:
p2pk_outputs = self.extract_p2pk_from_transaction(tx)
if p2pk_outputs:
total_p2pk_in_block += len(p2pk_outputs)
all_p2pk_in_block.extend(p2pk_outputs)
for p2pk in p2pk_outputs:
print(f" ✅ P2PK in TX {tx['txid'][:16]}... | Valore: {p2pk['value']} sat | Script: {p2pk['scriptpubkey'][:20]}...")
# Salva tutti i P2PK trovati nel blocco
if all_p2pk_in_block:
self.save_p2pk_data(height, all_p2pk_in_block, timestamp)
print(f" 📊 Blocco {height}: {total_p2pk_in_block} P2PK trovati e salvati")
else:
# Aggiorna comunque il progresso anche se non ci sono P2PK
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('UPDATE scan_progress SET last_scanned_block = ? WHERE id = 1', (height,))
conn.commit()
conn.close()
print(f" 🔍 Blocco {height}: Nessun P2PK trovato")
# Aggiorna progresso ogni 10 blocchi
if height % 10 == 0:
self.print_progress()
# Rispetta l'API (rate limiting)
time.sleep(delay)
except KeyboardInterrupt:
print("\n⏸️ Scansione interrotta dall'utente")
break
except Exception as e:
print(f"❌ Errore durante scansione blocco {height}: {e}")
time.sleep(2) # Pausa più lunga in caso di errore
self.print_final_report()
def print_progress(self):
"""Stampa il progresso attuale"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT last_scanned_block, total_p2pk_found FROM scan_progress WHERE id = 1')
progress = cursor.fetchone()
cursor.execute('SELECT COUNT(DISTINCT block_height) FROM p2pk_addresses')
blocks_with_p2pk = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM p2pk_addresses')
total_p2pk = cursor.fetchone()[0]
conn.close()
print(f"\n{'='*50}")
print(f"📈 PROGRESSO SCANSIONE")
print(f"📦 Ultimo blocco: {progress[0]}")
print(f"🔑 P2PK totali: {progress[1]}")
print(f"📊 Blocchi con P2PK: {blocks_with_p2pk}")
print(f"📝 Indirizzi unici: {total_p2pk}")
print(f"{'='*50}\n")
def print_final_report(self):
"""Stampa report finale"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Statistiche finali
cursor.execute('SELECT MAX(block_height), MIN(block_height) FROM p2pk_addresses')
max_block, min_block = cursor.fetchone()
cursor.execute('SELECT SUM(value_satoshi) FROM p2pk_addresses')
total_value = cursor.fetchone()[0] or 0
cursor.execute('SELECT COUNT(*) FROM p2pk_addresses')
total_addresses = cursor.fetchone()[0]
print(f"\n{'='*60}")
print(f"🎉 SCANSIONE COMPLETATA")
print(f"{'='*60}")
print(f"📊 Totale indirizzi P2PK: {total_addresses}")
print(f"📦 Primo blocco con P2PK: {min_block or 'Nessuno'}")
print(f"📦 Ultimo blocco con P2PK: {max_block or 'Nessuno'}")
print(f"💰 Valore totale: {total_value / 100000000:.8f} BTC")
print(f"💾 Database: {self.db_path}")
print(f"{'='*60}")
# Esempio query per estrarre chiavi pubbliche
if total_addresses > 0:
print(f"\n📋 Esempio dati raccolti:")
cursor.execute('SELECT scriptpubkey FROM p2pk_addresses LIMIT 3')
samples = cursor.fetchall()
for i, sample in enumerate(samples):
print(f" {i+1}. {sample[0][:50]}...")
conn.close()
def export_to_csv(self, output_file: str = "p2pk_data.csv"):
"""Esporta dati in CSV per analisi"""
import csv
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT block_height, txid, output_index,
scriptpubkey, value_satoshi, timestamp
FROM p2pk_addresses
ORDER BY block_height
''')
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Block', 'TXID', 'Output Index',
'ScriptPubKey', 'Value (satoshi)', 'Timestamp'])
for row in cursor.fetchall():
writer.writerow(row)
conn.close()
print(f"📁 Dati esportati in: {output_file}")
# USO DIDATTICO
if __name__ == "__main__":
# AVVERTENZA IMPORTANTE
print("⚠️ AVVISO: Questo script è SOLO per SCOPI EDUCATIVI")
print("⛔ NON usare per attività illegali")
print("📚 Scopo: Studio della struttura blockchain Bitcoin\n")
# Crea scanner
scanner = P2PKBlockchainScanner("bitcoin_p2pk_study.db")
# Recupera l'ultimo blocco scannerizzato
last_scanned = scanner.get_last_scanned_block()
print(f"📊 Ultimo blocco scannerizzato: {last_scanned}")
print(f"💡 I primi blocchi di Bitcoin (1-10000) contengono molti P2PK")
# Chiedi all'utente il blocco di inizio
start_input = input(f"\n📍 Blocco di inizio scansione (default: {last_scanned + 1 if last_scanned > 0 else 1}): ").strip()
if start_input:
try:
START_BLOCK = int(start_input)
if START_BLOCK < 1:
print("⚠️ Il blocco deve essere >= 1, uso blocco 1")
START_BLOCK = 1
except ValueError:
print(f"⚠️ Valore non valido, uso blocco {last_scanned + 1 if last_scanned > 0 else 1}")
START_BLOCK = last_scanned + 1 if last_scanned > 0 else 1
else:
START_BLOCK = last_scanned + 1 if last_scanned > 0 else 1
# Chiedi all'utente il blocco finale
end_input = input(f"📍 Blocco finale scansione (default: {START_BLOCK + 999}): ").strip()
if end_input:
try:
END_BLOCK = int(end_input)
if END_BLOCK < START_BLOCK:
print(f"⚠️ Il blocco finale deve essere >= {START_BLOCK}, uso {START_BLOCK + 999}")
END_BLOCK = START_BLOCK + 999
except ValueError:
print(f"⚠️ Valore non valido, uso blocco {START_BLOCK + 999}")
END_BLOCK = START_BLOCK + 999
else:
END_BLOCK = START_BLOCK + 999
# Chiedi il delay tra le richieste
delay_input = input("⏱️ Delay tra richieste in secondi (default: 1.0): ").strip()
if delay_input:
try:
REQUEST_DELAY = float(delay_input)
if REQUEST_DELAY < 0.1:
print("⚠️ Il delay minimo è 0.1s per rispettare l'API")
REQUEST_DELAY = 0.1
except ValueError:
print("⚠️ Valore non valido, uso 1.0s")
REQUEST_DELAY = 1.0
else:
REQUEST_DELAY = 1.0
print(f"\n🔧 Configurazione:")
print(f" Primo blocco: {START_BLOCK}")
print(f" Ultimo blocco: {END_BLOCK}")
print(f" Totale blocchi: {END_BLOCK - START_BLOCK + 1}")
print(f" Delay richieste: {REQUEST_DELAY}s")
print(f" Tempo stimato: ~{((END_BLOCK - START_BLOCK + 1) * REQUEST_DELAY) / 60:.1f} minuti")
# Conferma
response = input("\n▶️ Avviare la scansione? (s/n): ").strip().lower()
if response == 's':
scanner.scan_blocks(START_BLOCK, END_BLOCK, REQUEST_DELAY)
# Esporta in CSV
export = input("\n📤 Esportare dati in CSV? (s/n): ").strip().lower()
if export == 's':
scanner.export_to_csv()
else:
print("⏹️ Scansione annullata")
print("\n💡 Per analisi ulteriori:")
print(" 1. Usa SQLite per query: SELECT * FROM p2pk_addresses LIMIT 10")
print(" 2. Analizza gli script P2PK con librerie Bitcoin")
print(" 3. Studia la struttura delle transazioni early Bitcoin")