168 lines
6.3 KiB
Python
168 lines
6.3 KiB
Python
import struct, logging
|
|
from binascii import unhexlify, hexlify
|
|
from utils import double_sha256, encode_varint
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
def tx_encode_coinbase_height(height: int) -> str:
|
|
"""
|
|
Encode block height according to BIP34 (CScriptNum format) to include it
|
|
in the coinbase transaction scriptSig.
|
|
"""
|
|
if height < 0:
|
|
raise ValueError("Block height must be greater than or equal to 0.")
|
|
if height == 0:
|
|
return "00"
|
|
result = bytearray()
|
|
v = height
|
|
while v:
|
|
result.append(v & 0xff)
|
|
v >>= 8
|
|
# If the most significant bit of the last byte is 1, append 0x00
|
|
if result and (result[-1] & 0x80):
|
|
result.append(0x00)
|
|
return f"{len(result):02x}" + result.hex()
|
|
|
|
def is_segwit_tx(raw_hex: str) -> bool:
|
|
"""
|
|
Return True if the transaction is serialized in SegWit format.
|
|
"""
|
|
return len(raw_hex) >= 12 and raw_hex[8:12] == "0001"
|
|
|
|
def build_coinbase_transaction(template, miner_script_pubkey, extranonce1, extranonce2, coinbase_message=None):
|
|
"""Build a coinbase transaction for mining."""
|
|
height = template["height"]
|
|
reward = template["coinbasevalue"]
|
|
wc_raw = template.get("default_witness_commitment") # can be full script or root only
|
|
segwit = bool(wc_raw)
|
|
|
|
tx_version = struct.pack("<I", 2).hex() # coinbase tx version 2 (BIP68/SegWit)
|
|
parts = [tx_version]
|
|
if segwit:
|
|
parts.append("0001") # marker + flag
|
|
|
|
# ---- coinbase input ------------------------------------------------
|
|
parts += ["01", "00"*32, "ffffffff"]
|
|
|
|
script_sig = tx_encode_coinbase_height(height)
|
|
if coinbase_message:
|
|
m = coinbase_message.encode()
|
|
script_sig += "6a" + f"{len(m):02x}" + m.hex()
|
|
# Add extranonce1 and extranonce2 as required by Stratum V1
|
|
script_sig += extranonce1 + extranonce2
|
|
|
|
if len(script_sig)//2 > 100:
|
|
raise ValueError("scriptSig > 100 byte")
|
|
|
|
parts.append(encode_varint(len(script_sig)//2) + script_sig)
|
|
parts.append("ffffffff") # sequence
|
|
|
|
# ---- outputs -------------------------------------------------------
|
|
outputs = []
|
|
|
|
miner_out = struct.pack("<Q", reward).hex()
|
|
miner_out += encode_varint(len(miner_script_pubkey)//2) + miner_script_pubkey
|
|
outputs.append(miner_out)
|
|
|
|
if segwit:
|
|
# wc_script is already complete if it starts with '6a'
|
|
if wc_raw.startswith("6a"):
|
|
wc_script = wc_raw
|
|
else: # root only: build full script
|
|
wc_script = "6a24aa21a9ed" + wc_raw
|
|
outputs.append("00"*8 + encode_varint(len(wc_script)//2) + wc_script)
|
|
|
|
parts.append(encode_varint(len(outputs)) + "".join(outputs))
|
|
|
|
# ---- reserved witness ---------------------------------------------
|
|
if segwit:
|
|
parts += ["01", "20", "00"*32] # 1 item x 32 bytes
|
|
|
|
parts.append("00000000") # locktime
|
|
coinbase_hex = "".join(parts)
|
|
|
|
# ---------- legacy txid (without marker/flag + witness) -------------
|
|
if segwit:
|
|
# 1) remove marker+flag "0001"
|
|
core = tx_version + coinbase_hex[12:]
|
|
|
|
# 2) split locktime (last 8 hex chars)
|
|
locktime = core[-8:] # "00000000"
|
|
body = core[:-8] # everything except locktime
|
|
|
|
# 3) remove witness stack (68 hex) right before locktime
|
|
body_wo_wit = body[:-68] # remove only the 34 witness bytes
|
|
|
|
# 4) rebuild body + locktime
|
|
core = body_wo_wit + locktime
|
|
else:
|
|
core = coinbase_hex
|
|
|
|
txid = double_sha256(unhexlify(core))[::-1].hex()
|
|
return coinbase_hex, txid
|
|
|
|
def calculate_merkle_root(coinbase_txid: str, transactions: list[dict]) -> str:
|
|
"""Compute the Merkle root for a list of transaction IDs."""
|
|
# leaves in little-endian bytes format
|
|
tx_hashes = [unhexlify(coinbase_txid)[::-1]] + [
|
|
unhexlify(tx["hash"])[::-1] for tx in transactions
|
|
]
|
|
|
|
while len(tx_hashes) > 1:
|
|
if len(tx_hashes) % 2:
|
|
tx_hashes.append(tx_hashes[-1])
|
|
tx_hashes = [
|
|
double_sha256(tx_hashes[i] + tx_hashes[i+1])
|
|
for i in range(0, len(tx_hashes), 2)
|
|
]
|
|
|
|
return hexlify(tx_hashes[0][::-1]).decode()
|
|
|
|
def build_block_header(version, prev_hash, merkle_root, timestamp, bits, nonce):
|
|
"""
|
|
Build the 80-byte block header and return it as a hex string.
|
|
"""
|
|
# Build header by concatenating all fields in binary format
|
|
header = (
|
|
struct.pack("<I", version) + # Version (4 byte, little-endian)
|
|
unhexlify(prev_hash)[::-1] + # Previous Block Hash (32 bytes, reversed)
|
|
unhexlify(merkle_root)[::-1] + # Merkle Root (32 bytes, reversed)
|
|
struct.pack("<I", timestamp) + # Timestamp (4 byte, little-endian)
|
|
unhexlify(bits)[::-1] + # Bits/Target (4 bytes, reversed)
|
|
struct.pack("<I", nonce) # Nonce (4 byte, little-endian)
|
|
)
|
|
# Convert binary header to hexadecimal
|
|
return hexlify(header).decode()
|
|
|
|
def serialize_block(header_hex, coinbase_tx, transactions):
|
|
"""
|
|
Serialize the full block in Bitcoin wire format.
|
|
"""
|
|
# High-level message -> INFO
|
|
log.info("Block serialization started")
|
|
|
|
# Compute total transaction count (coinbase + regular transactions)
|
|
num_tx = len(transactions) + 1 # +1 to include coinbase
|
|
# Encode transaction count as hex VarInt
|
|
num_tx_hex = encode_varint(num_tx)
|
|
|
|
try:
|
|
# Concatenate all regular transactions in hex
|
|
transactions_hex = "".join(tx["data"] for tx in transactions)
|
|
except KeyError as e:
|
|
# Operational error -> ERROR (includes stack trace)
|
|
log.exception("A transaction is missing the 'data' field")
|
|
return None
|
|
|
|
# Assemble full block: header + tx count + coinbase + remaining txs
|
|
block_hex = header_hex + num_tx_hex + coinbase_tx + transactions_hex
|
|
|
|
# Success confirmation -> INFO
|
|
log.info("Block serialized successfully - %d total transactions", num_tx)
|
|
|
|
# Verbose detail (can be thousands of characters) -> DEBUG
|
|
log.debug("Block HEX: %s", block_hex)
|
|
|
|
# Return serialized block
|
|
return block_hex
|