Files
cpu-miner/prototype/block_builder.py

168 lines
6.3 KiB
Python
Raw Permalink Normal View History

2026-03-24 17:06:51 +01:00
import struct, logging
from binascii import unhexlify, hexlify
from utils import double_sha256, encode_varint
log = logging.getLogger(__name__)
def tx_encode_coinbase_height(height: int) -> str:
"""
Encode block height according to BIP34 (CScriptNum format) to include it
in the coinbase transaction scriptSig.
"""
if height < 0:
raise ValueError("Block height must be greater than or equal to 0.")
if height == 0:
return "00"
result = bytearray()
v = height
while v:
result.append(v & 0xff)
v >>= 8
# If the most significant bit of the last byte is 1, append 0x00
if result and (result[-1] & 0x80):
result.append(0x00)
return f"{len(result):02x}" + result.hex()
def is_segwit_tx(raw_hex: str) -> bool:
"""
Return True if the transaction is serialized in SegWit format.
"""
return len(raw_hex) >= 12 and raw_hex[8:12] == "0001"
def build_coinbase_transaction(template, miner_script_pubkey, extranonce1, extranonce2, coinbase_message=None):
"""Build a coinbase transaction for mining."""
height = template["height"]
reward = template["coinbasevalue"]
wc_raw = template.get("default_witness_commitment") # can be full script or root only
segwit = bool(wc_raw)
tx_version = struct.pack("<I", 2).hex() # coinbase tx version 2 (BIP68/SegWit)
parts = [tx_version]
if segwit:
parts.append("0001") # marker + flag
# ---- coinbase input ------------------------------------------------
parts += ["01", "00"*32, "ffffffff"]
script_sig = tx_encode_coinbase_height(height)
if coinbase_message:
m = coinbase_message.encode()
script_sig += "6a" + f"{len(m):02x}" + m.hex()
# Add extranonce1 and extranonce2 as required by Stratum V1
script_sig += extranonce1 + extranonce2
if len(script_sig)//2 > 100:
raise ValueError("scriptSig > 100 byte")
parts.append(encode_varint(len(script_sig)//2) + script_sig)
parts.append("ffffffff") # sequence
# ---- outputs -------------------------------------------------------
outputs = []
miner_out = struct.pack("<Q", reward).hex()
miner_out += encode_varint(len(miner_script_pubkey)//2) + miner_script_pubkey
outputs.append(miner_out)
if segwit:
# wc_script is already complete if it starts with '6a'
if wc_raw.startswith("6a"):
wc_script = wc_raw
else: # root only: build full script
wc_script = "6a24aa21a9ed" + wc_raw
outputs.append("00"*8 + encode_varint(len(wc_script)//2) + wc_script)
parts.append(encode_varint(len(outputs)) + "".join(outputs))
# ---- reserved witness ---------------------------------------------
if segwit:
parts += ["01", "20", "00"*32] # 1 item x 32 bytes
parts.append("00000000") # locktime
coinbase_hex = "".join(parts)
# ---------- legacy txid (without marker/flag + witness) -------------
if segwit:
# 1) remove marker+flag "0001"
core = tx_version + coinbase_hex[12:]
# 2) split locktime (last 8 hex chars)
locktime = core[-8:] # "00000000"
body = core[:-8] # everything except locktime
# 3) remove witness stack (68 hex) right before locktime
body_wo_wit = body[:-68] # remove only the 34 witness bytes
# 4) rebuild body + locktime
core = body_wo_wit + locktime
else:
core = coinbase_hex
txid = double_sha256(unhexlify(core))[::-1].hex()
return coinbase_hex, txid
def calculate_merkle_root(coinbase_txid: str, transactions: list[dict]) -> str:
"""Compute the Merkle root for a list of transaction IDs."""
# leaves in little-endian bytes format
tx_hashes = [unhexlify(coinbase_txid)[::-1]] + [
unhexlify(tx["hash"])[::-1] for tx in transactions
]
while len(tx_hashes) > 1:
if len(tx_hashes) % 2:
tx_hashes.append(tx_hashes[-1])
tx_hashes = [
double_sha256(tx_hashes[i] + tx_hashes[i+1])
for i in range(0, len(tx_hashes), 2)
]
return hexlify(tx_hashes[0][::-1]).decode()
def build_block_header(version, prev_hash, merkle_root, timestamp, bits, nonce):
"""
Build the 80-byte block header and return it as a hex string.
"""
# Build header by concatenating all fields in binary format
header = (
struct.pack("<I", version) + # Version (4 byte, little-endian)
unhexlify(prev_hash)[::-1] + # Previous Block Hash (32 bytes, reversed)
unhexlify(merkle_root)[::-1] + # Merkle Root (32 bytes, reversed)
struct.pack("<I", timestamp) + # Timestamp (4 byte, little-endian)
unhexlify(bits)[::-1] + # Bits/Target (4 bytes, reversed)
struct.pack("<I", nonce) # Nonce (4 byte, little-endian)
)
# Convert binary header to hexadecimal
return hexlify(header).decode()
def serialize_block(header_hex, coinbase_tx, transactions):
"""
Serialize the full block in Bitcoin wire format.
"""
# High-level message -> INFO
log.info("Block serialization started")
# Compute total transaction count (coinbase + regular transactions)
num_tx = len(transactions) + 1 # +1 to include coinbase
# Encode transaction count as hex VarInt
num_tx_hex = encode_varint(num_tx)
try:
# Concatenate all regular transactions in hex
transactions_hex = "".join(tx["data"] for tx in transactions)
except KeyError as e:
# Operational error -> ERROR (includes stack trace)
log.exception("A transaction is missing the 'data' field")
return None
# Assemble full block: header + tx count + coinbase + remaining txs
block_hex = header_hex + num_tx_hex + coinbase_tx + transactions_hex
# Success confirmation -> INFO
log.info("Block serialized successfully - %d total transactions", num_tx)
# Verbose detail (can be thousands of characters) -> DEBUG
log.debug("Block HEX: %s", block_hex)
# Return serialized block
return block_hex