feat: add Electrum-compatible wallet encryption
- Add src/crypto.py with AES-256-CBC pw_encode/pw_decode using double SHA256 key derivation (Electrum standard) - Add encrypt_wallet() and decrypt_wallet() to src/hd_wallet.py - Prompt for encryption password when saving HD wallet to file - Add 13 encryption/decryption tests to tests/test_hd_wallet.py - Add pycryptodome to requirements.txt
This commit is contained in:
@@ -4,3 +4,4 @@ ecdsa==0.19.0
|
|||||||
six==1.17.0
|
six==1.17.0
|
||||||
pytest
|
pytest
|
||||||
bip-utils
|
bip-utils
|
||||||
|
pycryptodome
|
||||||
|
|||||||
45
src/crypto.py
Normal file
45
src/crypto.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
"""
|
||||||
|
Electrum-compatible AES-256-CBC wallet encryption.
|
||||||
|
|
||||||
|
Key derivation: sha256(sha256(password)) — double SHA256, 32 bytes.
|
||||||
|
Format: base64(iv[16] + AES-CBC-ciphertext)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
|
||||||
|
from Crypto.Cipher import AES
|
||||||
|
from Crypto.Util.Padding import pad, unpad
|
||||||
|
|
||||||
|
|
||||||
|
def _derive_key(password: str) -> bytes:
|
||||||
|
"""Derive 32-byte AES key from password using double SHA256 (Electrum standard)."""
|
||||||
|
return hashlib.sha256(hashlib.sha256(password.encode('utf-8')).digest()).digest()
|
||||||
|
|
||||||
|
|
||||||
|
def pw_encode(data: str, password: str) -> str:
|
||||||
|
"""
|
||||||
|
Encrypt a UTF-8 string using AES-256-CBC with a random IV.
|
||||||
|
Returns base64(iv + ciphertext), compatible with Electrum's pw_encode.
|
||||||
|
"""
|
||||||
|
key = _derive_key(password)
|
||||||
|
iv = os.urandom(16)
|
||||||
|
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||||
|
encrypted = iv + cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))
|
||||||
|
return base64.b64encode(encrypted).decode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
def pw_decode(data: str, password: str) -> str:
|
||||||
|
"""
|
||||||
|
Decrypt a base64-encoded AES-256-CBC string.
|
||||||
|
Raises ValueError on incorrect password or corrupted data.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
key = _derive_key(password)
|
||||||
|
raw = base64.b64decode(data)
|
||||||
|
iv, ciphertext = raw[:16], raw[16:]
|
||||||
|
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||||
|
return unpad(cipher.decrypt(ciphertext), AES.block_size).decode('utf-8')
|
||||||
|
except Exception:
|
||||||
|
raise ValueError("Incorrect password or corrupted data.")
|
||||||
@@ -1,7 +1,13 @@
|
|||||||
|
import copy
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
from typing import Dict, List, Optional
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
|
try:
|
||||||
|
from src.crypto import pw_encode, pw_decode
|
||||||
|
except ImportError:
|
||||||
|
from crypto import pw_encode, pw_decode
|
||||||
|
|
||||||
from bip_utils import (
|
from bip_utils import (
|
||||||
Bip39MnemonicGenerator, Bip39WordsNum, Bip39SeedGenerator,
|
Bip39MnemonicGenerator, Bip39WordsNum, Bip39SeedGenerator,
|
||||||
Bip44, Bip44Coins, Bip44Changes,
|
Bip44, Bip44Coins, Bip44Changes,
|
||||||
@@ -122,6 +128,50 @@ def generate_hd_wallet(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def encrypt_wallet(wallet: Dict, password: str) -> Dict:
|
||||||
|
"""
|
||||||
|
Encrypt sensitive wallet fields using Electrum-compatible AES-256-CBC.
|
||||||
|
|
||||||
|
Encrypted fields:
|
||||||
|
- keystore.seed, keystore.xprv, keystore.passphrase
|
||||||
|
- addresses.receiving[i].private_key_hex, private_key_wif
|
||||||
|
|
||||||
|
The rest (xpub, derivation path, addresses) stays in plaintext,
|
||||||
|
consistent with Electrum's format.
|
||||||
|
|
||||||
|
Returns a deep copy of the wallet with use_encryption set to True.
|
||||||
|
"""
|
||||||
|
w = copy.deepcopy(wallet)
|
||||||
|
ks = w['keystore']
|
||||||
|
ks['seed'] = pw_encode(ks['seed'], password)
|
||||||
|
ks['xprv'] = pw_encode(ks['xprv'], password)
|
||||||
|
ks['passphrase'] = pw_encode(ks['passphrase'], password)
|
||||||
|
for addr in w['addresses']['receiving']:
|
||||||
|
addr['private_key_hex'] = pw_encode(addr['private_key_hex'], password)
|
||||||
|
addr['private_key_wif'] = pw_encode(addr['private_key_wif'], password)
|
||||||
|
w['use_encryption'] = True
|
||||||
|
return w
|
||||||
|
|
||||||
|
|
||||||
|
def decrypt_wallet(wallet: Dict, password: str) -> Dict:
|
||||||
|
"""
|
||||||
|
Decrypt an encrypted wallet dict. Raises ValueError on wrong password.
|
||||||
|
Returns a deep copy with all fields decrypted and use_encryption set to False.
|
||||||
|
"""
|
||||||
|
if not wallet.get('use_encryption'):
|
||||||
|
return copy.deepcopy(wallet)
|
||||||
|
w = copy.deepcopy(wallet)
|
||||||
|
ks = w['keystore']
|
||||||
|
ks['seed'] = pw_decode(ks['seed'], password)
|
||||||
|
ks['xprv'] = pw_decode(ks['xprv'], password)
|
||||||
|
ks['passphrase'] = pw_decode(ks['passphrase'], password)
|
||||||
|
for addr in w['addresses']['receiving']:
|
||||||
|
addr['private_key_hex'] = pw_decode(addr['private_key_hex'], password)
|
||||||
|
addr['private_key_wif'] = pw_decode(addr['private_key_wif'], password)
|
||||||
|
w['use_encryption'] = False
|
||||||
|
return w
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
print("=== HD Wallet Generator (BIP-32/39/44/49/84/86) ===")
|
print("=== HD Wallet Generator (BIP-32/39/44/49/84/86) ===")
|
||||||
|
|
||||||
@@ -167,9 +217,12 @@ def main():
|
|||||||
if filename:
|
if filename:
|
||||||
if not filename.endswith('.json'):
|
if not filename.endswith('.json'):
|
||||||
filename += '.json'
|
filename += '.json'
|
||||||
|
password = input("Encryption password (leave blank to save unencrypted): ").strip()
|
||||||
|
to_save = encrypt_wallet(result, password) if password else result
|
||||||
with open(filename, 'w') as f:
|
with open(filename, 'w') as f:
|
||||||
json.dump(result, f, indent=4)
|
json.dump(to_save, f, indent=4)
|
||||||
print(f"Saved to {filename}")
|
status = "encrypted" if password else "unencrypted"
|
||||||
|
print(f"Saved ({status}) to {filename}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error: {e}")
|
print(f"Error: {e}")
|
||||||
|
|||||||
@@ -2,7 +2,8 @@ import sys
|
|||||||
import os
|
import os
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
|
||||||
from src.hd_wallet import generate_hd_wallet
|
from src.hd_wallet import generate_hd_wallet, encrypt_wallet, decrypt_wallet
|
||||||
|
from src.crypto import pw_decode
|
||||||
|
|
||||||
SECRETSCAN_URL = "https://secretscan.org/Bitcoin?address={}"
|
SECRETSCAN_URL = "https://secretscan.org/Bitcoin?address={}"
|
||||||
|
|
||||||
@@ -222,6 +223,103 @@ def test_hd_generates_mnemonic_when_none():
|
|||||||
assert len(result['keystore']['seed'].split()) == 12
|
assert len(result['keystore']['seed'].split()) == 12
|
||||||
|
|
||||||
|
|
||||||
|
# --- Encryption ---
|
||||||
|
|
||||||
|
TEST_PASSWORD = "correct_horse_battery_staple"
|
||||||
|
|
||||||
|
def test_hd_no_encryption_by_default():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
assert result['use_encryption'] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_sets_flag():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
assert encrypted['use_encryption'] is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_encrypts_seed():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
assert encrypted['keystore']['seed'] != TEST_MNEMONIC
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_encrypts_xprv():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
original_xprv = result['keystore']['xprv']
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
assert encrypted['keystore']['xprv'] != original_xprv
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_leaves_xpub_plaintext():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
assert encrypted['keystore']['xpub'] == result['keystore']['xpub']
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_leaves_addresses_plaintext():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
assert encrypted['addresses']['receiving'][0]['address'] == result['addresses']['receiving'][0]['address']
|
||||||
|
assert encrypted['addresses']['receiving'][0]['public_key'] == result['addresses']['receiving'][0]['public_key']
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_encrypts_private_keys():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=2)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
for i in range(2):
|
||||||
|
orig = result['addresses']['receiving'][i]
|
||||||
|
enc = encrypted['addresses']['receiving'][i]
|
||||||
|
assert enc['private_key_hex'] != orig['private_key_hex']
|
||||||
|
assert enc['private_key_wif'] != orig['private_key_wif']
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_round_trip_seed():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
decrypted = decrypt_wallet(encrypted, TEST_PASSWORD)
|
||||||
|
assert decrypted['keystore']['seed'] == TEST_MNEMONIC
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_round_trip_xprv():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
decrypted = decrypt_wallet(encrypted, TEST_PASSWORD)
|
||||||
|
assert decrypted['keystore']['xprv'] == result['keystore']['xprv']
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encryption_round_trip_private_keys():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=3)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
decrypted = decrypt_wallet(encrypted, TEST_PASSWORD)
|
||||||
|
for i in range(3):
|
||||||
|
assert decrypted['addresses']['receiving'][i]['private_key_hex'] == result['addresses']['receiving'][i]['private_key_hex']
|
||||||
|
assert decrypted['addresses']['receiving'][i]['private_key_wif'] == result['addresses']['receiving'][i]['private_key_wif']
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_decrypt_wrong_password():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
encrypted = encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
try:
|
||||||
|
decrypt_wallet(encrypted, "wrong_password")
|
||||||
|
assert False, "Should have raised ValueError"
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_encrypt_does_not_mutate_original():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
original_seed = result['keystore']['seed']
|
||||||
|
encrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
assert result['keystore']['seed'] == original_seed
|
||||||
|
|
||||||
|
|
||||||
|
def test_hd_decrypt_unencrypted_is_noop():
|
||||||
|
result = generate_hd_wallet('mainnet', 'bip84', TEST_MNEMONIC, num_addresses=1)
|
||||||
|
decrypted = decrypt_wallet(result, TEST_PASSWORD)
|
||||||
|
assert decrypted['keystore']['seed'] == result['keystore']['seed']
|
||||||
|
|
||||||
|
|
||||||
# --- SecretScan ---
|
# --- SecretScan ---
|
||||||
|
|
||||||
def test_hd_print_secretscan(capsys):
|
def test_hd_print_secretscan(capsys):
|
||||||
|
|||||||
Reference in New Issue
Block a user